You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@nifi.apache.org by olegz <gi...@git.apache.org> on 2016/03/11 18:31:49 UTC

[GitHub] nifi pull request: NIFI-1571 initial commit of SpringContext suppo...

GitHub user olegz opened a pull request:

    https://github.com/apache/nifi/pull/271

    NIFI-1571 initial commit of SpringContext support

    

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/olegz/nifi NIFI-1571

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/nifi/pull/271.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #271
    
----
commit 82bc3ceb16e1f1c237285da227c79fef4441e2f2
Author: Oleg Zhurakousky <ol...@suitcase.io>
Date:   2016-03-02T18:35:26Z

    NIFI-1571 initial commit of SpringContext support

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-1571 initial commit of SpringContext suppo...

Posted by olegz <gi...@git.apache.org>.
Github user olegz commented on the pull request:

    https://github.com/apache/nifi/pull/271#issuecomment-198126926
  
    Also, the Camel example was added for testing here https://github.com/olegz/si-demo .@PuspenduBanerjee if/when you get a chance please check it out. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-1571 initial commit of SpringContext suppo...

Posted by trkurc <gi...@git.apache.org>.
Github user trkurc commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/271#discussion_r56596211
  
    --- Diff: nifi-nar-bundles/nifi-spring-bundle/nifi-spring-processors/src/main/java/org/apache/nifi/spring/SpringContextProcessor.java ---
    @@ -0,0 +1,448 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.spring;
    +
    +import java.io.File;
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.io.OutputStream;
    +import java.net.MalformedURLException;
    +import java.net.URL;
    +import java.net.URLClassLoader;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Map.Entry;
    +import java.util.Set;
    +import java.util.concurrent.TimeUnit;
    +
    +import org.apache.commons.lang3.StringUtils;
    +import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.annotation.lifecycle.OnStopped;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.ValidationContext;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Processor;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.io.InputStreamCallback;
    +import org.apache.nifi.processor.io.OutputStreamCallback;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.spring.SpringDataExchanger.SpringResponse;
    +import org.apache.nifi.stream.io.StreamUtils;
    +import org.apache.nifi.util.FormatUtils;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +import org.springframework.messaging.Message;
    +import org.springframework.messaging.MessageChannel;
    +import org.springframework.messaging.PollableChannel;
    +
    +/**
    + * Implementation of {@link Processor} capable of sending and receiving data
    + * from application defined in Spring Application context. It does so via
    + * predefined in/out {@link MessageChannel}s (see spring-messaging module of
    + * Spring). Once such channels are defined user is free to implement the rest of
    + * the application any way they wish (e.g., custom code and/or using frameworks
    + * such as Spring Integration or Camel).
    + * <p>
    + * The requirement and expectations for channel types are:
    + * <ul>
    + * <li>Input channel must be of type {@link MessageChannel} and named "fromNiFi"
    + * (see {@link SpringNiFiConstants#FROM_NIFI})</li>
    + * <li>Output channel must be of type {@link PollableChannel} and named "toNiFi"
    + * (see {@link SpringNiFiConstants#TO_NIFI})</li>
    + * </ul>
    + * </p>
    + * Below is the example of sample configuration:
    + *
    + * <pre>
    + * &lt;?xml version="1.0" encoding="UTF-8"?&gt;
    + * &lt;beans xmlns="http://www.springframework.org/schema/beans"
    + *   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    + *   xmlns:int="http://www.springframework.org/schema/integration"
    + *  xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
    + *      http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration-4.2.xsd"&gt;
    + *
    + *  &lt;int:channel id="fromNiFi"/&gt;
    + *
    + *  . . . . .
    + *
    + *  &lt;int:channel id="toNiFi"&gt;
    + *      &lt;int:queue/&gt;
    + *  &lt;/int:channel&gt;
    + *
    + * &lt;/beans&gt;
    + * </pre>
    + * <p>
    + * Defining {@link MessageChannel} is optional. That's why this processor
    + * supports 3 modes of interaction with Spring Application Context:
    + * <ul>
    + * <li>Headless – no channels are defined therefore nothing is sent to or
    + * received from such Application Contexts (i.e., some monitoring app).</li>
    + * <li>One way (NiFi -&gt; Spring or Spring -&gt; NiFi) - depends on existence
    + * of one of "fromNiFi" or "toNiFi" channel in the Spring Application Context.
    + * </li>
    + * <li>Bi-directional (NiFi -&gt; Spring -&gt; Nifi or Spring -&gt; NiFi -&gt;
    + * Spring) - depends on existence of both "fromNiFi" and "toNiFi" channels in
    + * the Spring Application Context</li>
    + * </ul>
    + *
    + * </p>
    + * <p>
    + * To create an instance of the ApplicationConetxt this processor requires user
    + * to provide configuration file path and the path to the resources that needs
    + * to be added to the classpath of ApplicationContext. This essentially allows
    + * user to package their Spring Application any way they want as long as
    + * everything it requires is available on the classpath.
    + * </p>
    + * <p>
    + * Data exchange between Spring and NiFi relies on simple mechanism which is
    + * exposed via {@link SpringDataExchanger}; {@link FlowFile}s's content is
    + * converted to primitive representation that can be easily wrapped in Spring
    + * {@link Message}. The requirement imposed by this Processor is to send/receive
    + * {@link Message} with <i>payload</i> of type <i>byte[]</i> and headers of type
    + * <i>Map&lt;String, Object&gt;</i>. This is primarily for simplicity and type
    + * safety. Converters and Transformers could be used by either side to change
    + * representation of the content that is being exchanged between NiFi and
    + * Spring.
    + */
    +@TriggerWhenEmpty
    +@Tags({ "Spring", "Message", "Get", "Put", "Integration" })
    +@CapabilityDescription("A Processor that supports sending and receiving data from application defined in "
    +        + "Spring Application Context via predefined in/out MessageChannels.")
    +public class SpringContextProcessor extends AbstractProcessor {
    +    private final Logger logger = LoggerFactory.getLogger(SpringContextProcessor.class);
    +
    +    public static final PropertyDescriptor CTX_CONFIG_PATH = new PropertyDescriptor.Builder()
    +            .name("Application Context config path")
    +            .description("The path to the Spring Application Context configuration file relative to the classpath")
    +            .required(true)
    +.addValidator(new SpringContextConfigValidator())
    --- End diff --
    
    minor style issue: indenting


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-1571 initial commit of SpringContext suppo...

Posted by olegz <gi...@git.apache.org>.
Github user olegz commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/271#discussion_r56446521
  
    --- Diff: nifi-nar-bundles/nifi-spring-bundle/nifi-spring-processors/src/main/java/org/apache/nifi/spring/SpringContextProcessor.java ---
    @@ -0,0 +1,392 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.spring;
    +
    +import java.io.File;
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.io.OutputStream;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Map.Entry;
    +import java.util.Set;
    +import java.util.concurrent.TimeUnit;
    +
    +import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.annotation.lifecycle.OnStopped;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.ValidationContext;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Processor;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.io.InputStreamCallback;
    +import org.apache.nifi.processor.io.OutputStreamCallback;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.spring.SpringDataExchanger.SpringResponse;
    +import org.apache.nifi.stream.io.StreamUtils;
    +import org.apache.nifi.util.FormatUtils;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +import org.springframework.messaging.Message;
    +import org.springframework.messaging.MessageChannel;
    +import org.springframework.messaging.PollableChannel;
    +
    +/**
    + * Implementation of {@link Processor} capable of sending and receiving data
    + * from application defined in Spring Application context. It does so via
    + * predefined in/out {@link MessageChannel}s (see spring-messaging module of
    + * Spring). Once such channels are defined user is free to implement the rest of
    + * the application any way they wish (e.g., custom code and/or using frameworks
    + * such as Spring Integration or Camel).
    + * <p>
    + * The requirement and expectations for channel types are:
    + * <ul>
    + * <li>Input channel must be of type {@link MessageChannel} and named "fromNiFi"
    + * (see {@link SpringNiFiConstants#FROM_NIFI})</li>
    + * <li>Output channel must be of type {@link PollableChannel} and named "toNiFi"
    + * (see {@link SpringNiFiConstants#TO_NIFI})</li>
    + * </ul>
    + * </p>
    + * Below is the example of sample configuration:
    + *
    + * <pre>
    + * &lt;?xml version="1.0" encoding="UTF-8"?&gt;
    + * &lt;beans xmlns="http://www.springframework.org/schema/beans"
    + *   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    + *   xmlns:int="http://www.springframework.org/schema/integration"
    + *  xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
    + *      http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration-4.2.xsd"&gt;
    + *
    + *  &lt;int:channel id="fromNiFi"/&gt;
    + *
    + *  . . . . .
    + *
    + *  &lt;int:channel id="toNiFi"&gt;
    + *      &lt;int:queue/&gt;
    + *  &lt;/int:channel&gt;
    + *
    + * &lt;/beans&gt;
    + * </pre>
    + * <p>
    + * Defining {@link MessageChannel} is optional. That's why this processor
    + * supports 3 modes of interaction with Spring Application Context:
    + * <ul>
    + * <li>Headless – no channels are defined therefore nothing is sent to or
    + * received from such Application Contexts (i.e., some monitoring app).</li>
    + * <li>One way (NiFi -&gt; Spring or Spring -&gt; NiFi) - depends on existence
    + * of one of "fromNiFi" or "toNiFi" channel in the Spring Application Context.
    + * </li>
    + * <li>Bi-directional (NiFi -&gt; Spring -&gt; Nifi or Spring -&gt; NiFi -&gt;
    + * Spring) - depends on existence of both "fromNiFi" and "toNiFi" channels in
    + * the Spring Application Context</li>
    + * </ul>
    + *
    + * </p>
    + * <p>
    + * To create an instance of the ApplicationConetxt this processor requires user
    + * to provide configuration file path and the path to the resources that needs
    + * to be added to the classpath of ApplicationContext. This essentially allows
    + * user to package their Spring Application any way they want as long as
    + * everything it requires is available on the classpath.
    + * </p>
    + * <p>
    + * Data exchange between Spring and NiFi relies on simple mechanism which is
    + * exposed via {@link SpringDataExchanger}; {@link FlowFile}s's content is
    + * converted to primitive representation that can be easily wrapped in Spring
    + * {@link Message}. The requirement imposed by this Processor is to send/receive
    + * {@link Message} with <i>payload</i> of type <i>byte[]</i> and headers of type
    + * <i>Map&lt;String, Object&gt;</i>. This is primarily for simplicity and type
    + * safety. Converters and Transformers could be used by either side to change
    + * representation of the content that is being exchanged between NiFi and
    + * Spring.
    + */
    +@TriggerWhenEmpty
    +@Tags({ "Spring", "Message", "Get", "Put", "Integration" })
    +@CapabilityDescription("A Processor that supports sending and receiving data from application defined in "
    +        + "Spring Application Context via predefined in/out MessageChannels.")
    +public class SpringContextProcessor extends AbstractProcessor {
    +    private final Logger logger = LoggerFactory.getLogger(SpringContextProcessor.class);
    +
    +    public static final PropertyDescriptor CTX_CONFIG_NAME = new PropertyDescriptor.Builder()
    +            .name("Application Context config path")
    +            .description("The path to the Spring Application Context configuration file relative to the classpath")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    --- End diff --
    
    Actually this property is usually defined before the rest of the class path which means that even with some trickery it would not be possible to do until the next property (classpath) is defined.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-1571 initial commit of SpringContext suppo...

Posted by olegz <gi...@git.apache.org>.
Github user olegz commented on the pull request:

    https://github.com/apache/nifi/pull/271#issuecomment-198126662
  
    @trkurc @joewitt final commit is in. Everything is squashed so give it a fresh look. Most importantly the validation suggestion that was floating around last night wasn't difficult after all, so it's in.
    LMK


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-1571 initial commit of SpringContext suppo...

Posted by trkurc <gi...@git.apache.org>.
Github user trkurc commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/271#discussion_r56103351
  
    --- Diff: nifi-nar-bundles/nifi-spring-bundle/nifi-spring-processors/src/main/resources/docs/org.apache.nifi.spring.SpringContextProcessor/additionalDetails.html ---
    @@ -0,0 +1,94 @@
    +<!DOCTYPE html>
    +<html lang="en">
    +    <!--
    +      Licensed to the Apache Software Foundation (ASF) under one or more
    +      contributor license agreements.  See the NOTICE file distributed with
    +      this work for additional information regarding copyright ownership.
    +      The ASF licenses this file to You under the Apache License, Version 2.0
    +      (the "License"); you may not use this file except in compliance with
    +      the License.  You may obtain a copy of the License at
    +          http://www.apache.org/licenses/LICENSE-2.0
    +      Unless required by applicable law or agreed to in writing, software
    +      distributed under the License is distributed on an "AS IS" BASIS,
    +      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    +      See the License for the specific language governing permissions and
    +      limitations under the License.
    +    -->
    +    <head>
    +        <meta charset="utf-8" />
    +        <title>SpringContextProcessor</title>
    +        <link rel="stylesheet" href="../../css/component-usage.css" type="text/css" />
    +    </head>
    +
    +    <body>
    +        <!-- Processor Documentation ================================================== -->
    +        <h2>Description:</h2>
    +        <p>
    +            <b>SpringContextProcessor</b> – allows integration of processes encapsulated in Spring Application Context to run as NiFi 
    +            processor by becoming a runtime host for an instance of Spring Application Context.  
    +        </p>
    +        <p>
    +            Communication between NiFi and process encapsulated within Spring Application Context is accomplished via Spring Messaging 
    +            (one of the core modules of Spring Framework) and supports 3 usage modes:
    +            <ul>
    +            	<li><i>Headless</i> - no interaction with NiFi, meaning nothing is sent to it and nothing is received from it (i.e., some monitoring app).  
    +            	In this case NiFi simply plays the role of the runtime host.</li>
    +            	<li><i>One way (NiFi -&gt; Spring or Spring -&gt; NiFi). </i> - This depends on existence of pre-defined message channel in Spring 
    +            	Application Context. The name of the channel should be “fromNiFi” and the type <i>org.springframework.messaging.MessageChannel.</i></li>
    +            	<li><i>By-directional (NiFi -&gt; Spring -&gt; Nifi or Spring -&gt; NiFi -&gt; Spring)</i> - This depends on existence of two channels  
    +            	in Spring Application Context. One channel receives messages from NiFi with name “fromNiFi” and type <i>org.springframework.messaging.MessageChannel</i>i>
    +            	 and another is o receive messages from Spring with name “toNiFi” and type <i>org.springframework.messaging.PollableChannel.</i></li>
    +            </ul>
    +            The example below demonstrates template configuration for bi-directional Spring Application Context configuration:
    +            <code>
    +            <pre>
    +    &lt;int:channel id=”fromNiFi”/&gt;
    +    
    +    &lt;!— 
    +    your custom app configuration to receive messages from ‘fromNiFi’ channel and optionally send back to NiFi via ‘toNiFi’ channel. 
    +    It could contain any Spring-based application (i.e., Spring Integration, Apache Camel and/or custom code). All you need to do is inject 
    +    channels into your beans and send/receive messages from it.
    +    --&gt;
    +    
    +    &lt;int:channel id="toNiFi"&gt;
    +		&lt;int:queue/&gt;
    +    &lt;/int:channel&gt;
    +            </pre>
    +            </code>
    +        </p>
    +        <p>
    +        The component is based on assumption that user has an existing Spring Application encapsulated in Spring Context that exposes optional in/out 
    +        MessagingChannels to allow data to flow to/from ApplicationContext and into/out-of. NiFi. 
    +        Such application is realized by having a directory on the file system, which contains contains all required resources for such application to run.
    +        Such resources usually are JAR files to satisfy application's class-path as well as JAR representing the application and its configuration.
    +        Below is the example of what such directory may contain. In this case the 'SI_DEMO-0.0.1-SNAPSHOT.jar' represents the actual application and the rest 
    +        of the JARs represent class-path dependency required by an application.
    +        <pre>
    +        deps
    +         ├── SI_DEMO-0.0.1-SNAPSHOT.jar
    +         ├── aopalliance-1.0.jar
    +         ├── commons-logging-1.2.jar
    +         ├── spring-aop-4.2.4.RELEASE.jar
    +         ├── spring-beans-4.2.4.RELEASE.jar
    +         ├── spring-context-4.2.4.RELEASE.jar
    +         ├── spring-core-4.2.4.RELEASE.jar
    +         ├── spring-expression-4.2.4.RELEASE.jar
    +         ├── spring-integration-core-4.2.5.RELEASE.jar
    +         ├── spring-messaging-4.2.4.RELEASE.jar
    +        </pre>
    +        </p>
    +        <p>
    +        You introduce the processor the usual way and then configure its properties:
    +        <ul>
    +        <li><i><b>Application Context config path</b></i> [REQUIRED] - a path to the Application Context configuration. 
    +        The path is relative to the class-path of the application defined by the <i>Application Context class path</i> property </li>
    +        <li><i><b>Application Context class path</b></i> [REQUIRED] - a path to a directory on the file system where application dependencies are. </li>
    +        <li><i>Send Timeout</i> [OPTIONAL] - the wait time for sending messages to Spring Application Context. Only required if NiFi plans to send data o Spring. 
    --- End diff --
    
    Typo: s/ o / to /


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-1571 initial commit of SpringContext suppo...

Posted by joewitt <gi...@git.apache.org>.
Github user joewitt commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/271#discussion_r56103524
  
    --- Diff: nifi-nar-bundles/nifi-spring-bundle/nifi-spring-processors/src/main/java/org/apache/nifi/spring/SpringContextProcessor.java ---
    @@ -0,0 +1,382 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.spring;
    +
    +import java.io.File;
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.io.OutputStream;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Map.Entry;
    +import java.util.Set;
    +import java.util.concurrent.TimeUnit;
    +
    +import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.annotation.lifecycle.OnStopped;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.ValidationContext;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Processor;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.io.InputStreamCallback;
    +import org.apache.nifi.processor.io.OutputStreamCallback;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.spring.SpringDataExchanger.SpringResponse;
    +import org.apache.nifi.stream.io.StreamUtils;
    +import org.apache.nifi.util.FormatUtils;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +import org.springframework.messaging.Message;
    +import org.springframework.messaging.MessageChannel;
    +import org.springframework.messaging.PollableChannel;
    +
    +/**
    + * Implementation of {@link Processor} capable of sending and receiving data
    + * from application defined in Spring Application context. It does so via
    + * predefined in/out {@link MessageChannel}s (see spring-messaging module of
    + * Spring). Once such channels are defined user is free to implement the rest of
    + * the application any way they wish (e.g., custom code and/or using frameworks
    + * such as Spring Integration or Camel).
    + * <p>
    + * The requirement and expectations for channel types are:
    + * <ul>
    + * <li>Input channel must be of type {@link MessageChannel} and named "fromNiFi"
    + * (see {@link SpringNiFiConstants#FROM_NIFI})</li>
    + * <li>Output channel must be of type {@link PollableChannel} and named "toNiFi"
    + * (see {@link SpringNiFiConstants#TO_NIFI})</li>
    + * </ul>
    + * </p>
    + * Below is the example of sample configuration:
    + *
    + * <pre>
    + * &lt;?xml version="1.0" encoding="UTF-8"?&gt;
    + * &lt;beans xmlns="http://www.springframework.org/schema/beans"
    + *   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    + *   xmlns:int="http://www.springframework.org/schema/integration"
    + *  xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
    + *      http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration-4.2.xsd"&gt;
    + *
    + *  &lt;int:channel id="fromNiFi"/&gt;
    + *
    + *  . . . . .
    + *
    + *  &lt;int:channel id="toNiFi"&gt;
    + *      &lt;int:queue/&gt;
    + *  &lt;/int:channel&gt;
    + *
    + * &lt;/beans&gt;
    + * </pre>
    + * <p>
    + * Defining {@link MessageChannel} is optional. That's why this processor
    + * supports 3 modes of interaction with Spring Application Context:
    + * <ul>
    + * <li>Headless – no channels are defined therefore nothing is sent to or
    + * received from such Application Contexts (i.e., some monitoring app).</li>
    + * <li>One way (NiFi -&gt; Spring or Spring -&gt; NiFi) - depends on existence
    + * of one of "fromNiFi" or "toNiFi" channel in the Spring Application Context.
    + * </li>
    + * <li>Bi-directional (NiFi -&gt; Spring -&gt; Nifi or Spring -&gt; NiFi -&gt;
    + * Spring) - depends on existence of both "fromNiFi" and "toNiFi" channels in
    + * the Spring Application Context</li>
    + * </ul>
    + *
    + * </p>
    + * <p>
    + * To create an instance of the ApplicationConetxt this processor requires user
    + * to provide configuration file path and the path to the resources that needs
    + * to be added to the classpath of ApplicationContext. This essentially allows
    + * user to package their Spring Application any way they want as long as
    + * everything it requires is available on the classpath.
    + * </p>
    + * <p>
    + * Data exchange between Spring and NiFi relies on simple mechanism which is
    + * exposed via {@link SpringDataExchanger}; {@link FlowFile}s's content is
    + * converted to primitive representation that can be easily wrapped in Spring
    + * {@link Message}. The requirement imposed by this Processor is to send/receive
    + * {@link Message} with <i>payload</i> of type <i>byte[]</i> and headers of type
    + * <i>Map&lt;String, Object&gt;</i>. This is primarily for simplicity and type
    + * safety. Converters and Transformers could be used by either side to change
    + * representation of the content that is being exchanged between NiFi and
    + * Spring.
    + */
    +@TriggerWhenEmpty
    +@Tags({ "Spring", "Message", "Get", "Put", "Integration" })
    +@CapabilityDescription("A Processor that supports sending and receiving data from application defined in "
    +        + "Spring Application Context via predefined in/out MessageChannels.")
    +public class SpringContextProcessor extends AbstractProcessor {
    +    private final Logger logger = LoggerFactory.getLogger(SpringContextProcessor.class);
    +
    +    public static final PropertyDescriptor CTX_CONFIG_NAME = new PropertyDescriptor.Builder()
    +            .name("Application Context config path")
    +            .description("The path to the Spring Application Context configuration file relative to the classpath")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +    public static final PropertyDescriptor CTX_LIB_PATH = new PropertyDescriptor.Builder()
    +            .name("Application Context class path")
    +            .description("Path to the directory with resources (i.e., JARs, configuration files etc.) required to be on "
    +                            + "the classpath of the ApplicationContext.")
    +            .addValidator(new ClientLibValidator())
    +            .required(true)
    +            .build();
    +    public static final PropertyDescriptor SEND_TIMEOUT = new PropertyDescriptor.Builder()
    +            .name("Send Timeout")
    +            .description("Timeout for sending data to Spring Application Context. Defaults to 0.")
    +            .required(false)
    +            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
    +            .build();
    +    public static final PropertyDescriptor RECEIVE_TIMEOUT = new PropertyDescriptor.Builder()
    +            .name("Receive Timeout")
    +            .description("Timeout for receiving date from Spring context. Defaults to 0.")
    +            .required(false)
    +            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
    +            .build();
    +
    +    // ====
    +
    +    public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success")
    +            .description(
    +                    "All FlowFiles that are successfully received from Spring Application Context are routed to this relationship")
    +            .build();
    +    public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure")
    +            .description(
    +                    "All FlowFiles that cannot be sent to Spring Application Context are routed to this relationship")
    +            .build();
    +
    +    private final static Set<Relationship> relationships;
    +
    +    private final static List<PropertyDescriptor> propertyDescriptors;
    +
    +    // =======
    +
    +    private volatile String applicationContextConfigFileName;
    +
    +    private volatile String applicationContextLibPath;
    +
    +    private volatile long sendTimeout;
    +
    +    private volatile long receiveTimeout;
    +
    +    private volatile SpringDataExchanger exchanger;
    +
    +    static {
    +        List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>();
    +        _propertyDescriptors.add(CTX_CONFIG_NAME);
    +        _propertyDescriptors.add(CTX_LIB_PATH);
    +        _propertyDescriptors.add(SEND_TIMEOUT);
    +        _propertyDescriptors.add(RECEIVE_TIMEOUT);
    +        propertyDescriptors = Collections.unmodifiableList(_propertyDescriptors);
    +
    +        Set<Relationship> _relationships = new HashSet<>();
    +        _relationships.add(REL_SUCCESS);
    +        _relationships.add(REL_FAILURE);
    +        relationships = Collections.unmodifiableSet(_relationships);
    +    }
    +
    +    /**
    +     *
    +     */
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return relationships;
    +    }
    +
    +    /**
    +     *
    +     */
    +    @OnScheduled
    +    public void initializeSpringContext(ProcessContext processContext) {
    +        this.applicationContextConfigFileName = processContext.getProperty(CTX_CONFIG_NAME).getValue();
    +        this.applicationContextLibPath = processContext.getProperty(CTX_LIB_PATH).getValue();
    +
    +        String stStr = processContext.getProperty(SEND_TIMEOUT).getValue();
    +        this.sendTimeout = stStr == null ? 0 : FormatUtils.getTimeDuration(stStr, TimeUnit.MILLISECONDS);
    +
    +        String rtStr = processContext.getProperty(RECEIVE_TIMEOUT).getValue();
    +        this.receiveTimeout = rtStr == null ? 0 : FormatUtils.getTimeDuration(rtStr, TimeUnit.MILLISECONDS);
    +
    +        try {
    +            if (logger.isDebugEnabled()) {
    +                logger.debug(
    +                        "Initializing Spring Application Context defined in " + this.applicationContextConfigFileName);
    +            }
    +            this.exchanger = SpringContextFactory.createSpringContextDelegate(this.applicationContextLibPath,
    +                    this.applicationContextConfigFileName);
    +        } catch (Exception e) {
    +            throw new IllegalStateException("Failed while initializing Spring Application Context", e);
    +        }
    +        if (logger.isInfoEnabled()) {
    +            logger.info("Successfully initialized Spring Application Context defined in "
    +                    + this.applicationContextConfigFileName);
    +        }
    +    }
    +
    +    /**
    +     * Will close the 'exchanger' which in turn will close both Spring
    +     * Application Context and the ClassLoader that loaded it allowing new
    +     * instance of Spring Application Context to be created upon the next start
    +     * (which may have an updated classpath and functionality) without
    +     * restarting NiFi.
    +     */
    +    @OnStopped
    +    public void closeSpringContext(ProcessContext processContext) {
    +        if (this.exchanger != null) {
    +            try {
    +                if (logger.isDebugEnabled()) {
    +                    logger.debug(
    +                            "Closing Spring Application Context defined in " + this.applicationContextConfigFileName);
    +                }
    +                this.exchanger.close();
    +                if (logger.isInfoEnabled()) {
    +                    logger.info("Successfully closed Spring Application Context defined in "
    +                            + this.applicationContextConfigFileName);
    +                }
    +            } catch (IOException e) {
    +                getLogger().warn("Failed while closing Spring Application Context", e);
    +            }
    +        }
    +    }
    +
    +    /**
    +     *
    +     */
    +    @Override
    +    public void onTrigger(ProcessContext context, ProcessSession processSession) throws ProcessException {
    +        FlowFile flowFileToProcess = processSession.get();
    +        if (flowFileToProcess != null) {
    +            this.sendToSpring(flowFileToProcess, context, processSession);
    --- End diff --
    
    @olegz feels like perhaps we should catch some sort of exception from here and route the flow file to a failure relationship.  If not then it means anytime a send to spring causes an exception we will rollback the session.  It is *possibly* better to instead have a failure relationship where we can penalize flow files and send them in the event they are things we send to spring and get a failure in doing so.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-1571 initial commit of SpringContext suppo...

Posted by mattyb149 <gi...@git.apache.org>.
Github user mattyb149 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/271#discussion_r56446243
  
    --- Diff: nifi-nar-bundles/nifi-spring-bundle/nifi-spring-processors/src/main/java/org/apache/nifi/spring/SpringContextProcessor.java ---
    @@ -0,0 +1,392 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.spring;
    +
    +import java.io.File;
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.io.OutputStream;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Map.Entry;
    +import java.util.Set;
    +import java.util.concurrent.TimeUnit;
    +
    +import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.annotation.lifecycle.OnStopped;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.ValidationContext;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Processor;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.io.InputStreamCallback;
    +import org.apache.nifi.processor.io.OutputStreamCallback;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.spring.SpringDataExchanger.SpringResponse;
    +import org.apache.nifi.stream.io.StreamUtils;
    +import org.apache.nifi.util.FormatUtils;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +import org.springframework.messaging.Message;
    +import org.springframework.messaging.MessageChannel;
    +import org.springframework.messaging.PollableChannel;
    +
    +/**
    + * Implementation of {@link Processor} capable of sending and receiving data
    + * from application defined in Spring Application context. It does so via
    + * predefined in/out {@link MessageChannel}s (see spring-messaging module of
    + * Spring). Once such channels are defined user is free to implement the rest of
    + * the application any way they wish (e.g., custom code and/or using frameworks
    + * such as Spring Integration or Camel).
    + * <p>
    + * The requirement and expectations for channel types are:
    + * <ul>
    + * <li>Input channel must be of type {@link MessageChannel} and named "fromNiFi"
    + * (see {@link SpringNiFiConstants#FROM_NIFI})</li>
    + * <li>Output channel must be of type {@link PollableChannel} and named "toNiFi"
    + * (see {@link SpringNiFiConstants#TO_NIFI})</li>
    + * </ul>
    + * </p>
    + * Below is the example of sample configuration:
    + *
    + * <pre>
    + * &lt;?xml version="1.0" encoding="UTF-8"?&gt;
    + * &lt;beans xmlns="http://www.springframework.org/schema/beans"
    + *   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    + *   xmlns:int="http://www.springframework.org/schema/integration"
    + *  xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
    + *      http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration-4.2.xsd"&gt;
    + *
    + *  &lt;int:channel id="fromNiFi"/&gt;
    + *
    + *  . . . . .
    + *
    + *  &lt;int:channel id="toNiFi"&gt;
    + *      &lt;int:queue/&gt;
    + *  &lt;/int:channel&gt;
    + *
    + * &lt;/beans&gt;
    + * </pre>
    + * <p>
    + * Defining {@link MessageChannel} is optional. That's why this processor
    + * supports 3 modes of interaction with Spring Application Context:
    + * <ul>
    + * <li>Headless – no channels are defined therefore nothing is sent to or
    + * received from such Application Contexts (i.e., some monitoring app).</li>
    + * <li>One way (NiFi -&gt; Spring or Spring -&gt; NiFi) - depends on existence
    + * of one of "fromNiFi" or "toNiFi" channel in the Spring Application Context.
    + * </li>
    + * <li>Bi-directional (NiFi -&gt; Spring -&gt; Nifi or Spring -&gt; NiFi -&gt;
    + * Spring) - depends on existence of both "fromNiFi" and "toNiFi" channels in
    + * the Spring Application Context</li>
    + * </ul>
    + *
    + * </p>
    + * <p>
    + * To create an instance of the ApplicationConetxt this processor requires user
    + * to provide configuration file path and the path to the resources that needs
    + * to be added to the classpath of ApplicationContext. This essentially allows
    + * user to package their Spring Application any way they want as long as
    + * everything it requires is available on the classpath.
    + * </p>
    + * <p>
    + * Data exchange between Spring and NiFi relies on simple mechanism which is
    + * exposed via {@link SpringDataExchanger}; {@link FlowFile}s's content is
    + * converted to primitive representation that can be easily wrapped in Spring
    + * {@link Message}. The requirement imposed by this Processor is to send/receive
    + * {@link Message} with <i>payload</i> of type <i>byte[]</i> and headers of type
    + * <i>Map&lt;String, Object&gt;</i>. This is primarily for simplicity and type
    + * safety. Converters and Transformers could be used by either side to change
    + * representation of the content that is being exchanged between NiFi and
    + * Spring.
    + */
    +@TriggerWhenEmpty
    +@Tags({ "Spring", "Message", "Get", "Put", "Integration" })
    +@CapabilityDescription("A Processor that supports sending and receiving data from application defined in "
    +        + "Spring Application Context via predefined in/out MessageChannels.")
    +public class SpringContextProcessor extends AbstractProcessor {
    +    private final Logger logger = LoggerFactory.getLogger(SpringContextProcessor.class);
    +
    +    public static final PropertyDescriptor CTX_CONFIG_NAME = new PropertyDescriptor.Builder()
    +            .name("Application Context config path")
    +            .description("The path to the Spring Application Context configuration file relative to the classpath")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    --- End diff --
    
    I assume the file is located with something like ClassLoader.getResource()? That might make a good validator on its own, whether for 0.6 or the future...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-1571 initial commit of SpringContext suppo...

Posted by olegz <gi...@git.apache.org>.
Github user olegz commented on the pull request:

    https://github.com/apache/nifi/pull/271#issuecomment-196846398
  
    @joewitt @trkurc PR comments are addressed, please review


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-1571 initial commit of SpringContext suppo...

Posted by olegz <gi...@git.apache.org>.
Github user olegz commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/271#discussion_r56106623
  
    --- Diff: nifi-nar-bundles/nifi-spring-bundle/nifi-spring-processors/src/main/java/org/apache/nifi/spring/bootstrap/SpringContextDelegate.java ---
    @@ -0,0 +1,141 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.spring.bootstrap;
    +
    +import java.io.Closeable;
    +import java.io.IOException;
    +import java.net.URLClassLoader;
    +import java.util.Map;
    +
    +import org.apache.nifi.spring.SpringDataExchanger;
    +import org.apache.nifi.spring.SpringNiFiConstants;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +import org.springframework.context.support.ClassPathXmlApplicationContext;
    +import org.springframework.messaging.Message;
    +import org.springframework.messaging.MessageChannel;
    +import org.springframework.messaging.PollableChannel;
    +import org.springframework.messaging.support.MessageBuilder;
    +
    +/**
    + * Scopes instance of itself to a dedicated {@link ClassLoader}, thus allowing
    + * Spring Application Context and its class path to be modified and refreshed by
    + * simply re-starting SpringContextProcessor. Also ensures that there are no
    + * class path collisions between multiple instances of Spring Context Processor
    + * which are loaded by the same NAR Class Loader.
    + */
    +/*
    + * This class is for internal use only and must never be instantiated by the NAR
    + * Class Loader (hence in a isolated package with nothing referencing it). It is
    + * loaded by a dedicated CL via byte array that represents it ensuring that this
    + * class can be loaded multiple times by multiple Class Loaders within a single
    + * instance of NAR.
    + */
    +final class SpringContextDelegate implements Closeable, SpringDataExchanger {
    +
    +    private final Logger logger = LoggerFactory.getLogger(SpringContextDelegate.class);
    +
    +    private final ClassPathXmlApplicationContext applicationContext;
    +
    +    private final MessageChannel toSpringChannel;
    +
    +    private final PollableChannel fromSpringChannel;
    +
    +    /**
    +     *
    +     */
    +    private SpringContextDelegate(String configName) {
    +        ClassLoader orig = Thread.currentThread().getContextClassLoader();
    +        Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
    +        if (logger.isDebugEnabled()) {
    +            logger.debug("Using " + Thread.currentThread().getContextClassLoader()
    +                    + " as context class loader while loading Spring Context '" + configName + "'.");
    +        }
    +        try {
    +            this.applicationContext = new ClassPathXmlApplicationContext(configName);
    +            if (this.applicationContext.containsBean(SpringNiFiConstants.FROM_NIFI)){
    +                this.toSpringChannel = this.applicationContext.getBean(SpringNiFiConstants.FROM_NIFI, MessageChannel.class);
    +                if (logger.isDebugEnabled()) {
    +                    logger.debug("Spring Application Context defined in '" + configName
    +                            + "' is capable of receiving messages from NiFi since 'fromNiFi' channel was discovered.");
    +                }
    +            } else {
    +                this.toSpringChannel = null;
    +            }
    +            if (this.applicationContext.containsBean(SpringNiFiConstants.TO_NIFI)){
    +                this.fromSpringChannel = this.applicationContext.getBean(SpringNiFiConstants.TO_NIFI, PollableChannel.class);
    +                if (logger.isDebugEnabled()) {
    +                    logger.debug("Spring Application Context defined in '" + configName
    +                            + "' is capable of sending messages to " + "NiFi since 'toNiFi' channel was discovered.");
    +                }
    +            } else {
    +                this.fromSpringChannel = null;
    +            }
    +            if (logger.isInfoEnabled() && this.toSpringChannel == null && this.fromSpringChannel == null){
    +                logger.info("Spring Application Context is headless since neither 'fromNiFi' nor 'toNiFi' channels were defined. "
    +                        + "No data will be exchanged.");
    +            }
    +        } finally {
    +            Thread.currentThread().setContextClassLoader(orig);
    +        }
    +    }
    +
    +    /**
    +     *
    +     */
    +    @Override
    +    public <T> boolean send(T payload, Map<String, ?> messageHeaders, long timeout) {
    +        if (this.toSpringChannel != null){
    +            return this.toSpringChannel.send(MessageBuilder.withPayload(payload).copyHeaders(messageHeaders).build(), timeout);
    +        }
    +        return false;
    +    }
    +
    +    /**
    +     *
    +     */
    +    @SuppressWarnings("unchecked")
    +    @Override
    +    public <T> SpringResponse<T> receive(long timeout) {
    +        if (this.fromSpringChannel != null) {
    +            final Message<T> message = (Message<T>) this.fromSpringChannel.receive(timeout);
    +            if (message != null) {
    +                if (!(message.getPayload() instanceof byte[]) && !(message.getPayload() instanceof String)) {
    +                    throw new IllegalStateException("Failed while receiving message from Spring due to the "
    +                            + "payload type being other then byte[] which is currently not supported. Please "
    --- End diff --
    
    Yep. The '||' was a late addition, way after the message


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-1571 initial commit of SpringContext suppo...

Posted by trkurc <gi...@git.apache.org>.
Github user trkurc commented on the pull request:

    https://github.com/apache/nifi/pull/271#issuecomment-197617908
  
    @olegz : checkstyle fails when I build with your latest commit
    ``
    [INFO] --- maven-checkstyle-plugin:2.15:check (check-style) @ nifi-spring-processors ---
    [WARNING] src/main/java/org/apache/nifi/spring/SpringContextFactory.java[44] (regexp) RegexpSinglelineJava: Line has trailing whitespace.
    [WARNING] src/main/java/org/apache/nifi/spring/SpringContextFactory.java[51] (regexp) RegexpSinglelineJava: Line has trailing whitespace.
    [WARNING] src/main/java/org/apache/nifi/spring/SpringContextFactory.java[61] (regexp) RegexpSinglelineJava: Line has trailing whitespace.
    [WARNING] src/test/java/org/apache/nifi/spring/SpringContextFactoryTests.java[20:15] (imports) UnusedImports: Unused import - org.junit.Assert.assertFalse.
    ``


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-1571 initial commit of SpringContext suppo...

Posted by olegz <gi...@git.apache.org>.
Github user olegz commented on the pull request:

    https://github.com/apache/nifi/pull/271#issuecomment-197621914
  
    Sorry, aside from style check there are still few things I want to do, so WIP for the next 11 hours ;). But I mainly wanted to see if you can play around with the sample app. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-1571 initial commit of SpringContext suppo...

Posted by PuspenduBanerjee <gi...@git.apache.org>.
Github user PuspenduBanerjee commented on the pull request:

    https://github.com/apache/nifi/pull/271#issuecomment-198245248
  
    @olegz Sorry for the delay in response, I shall look into that during next week.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-1571 initial commit of SpringContext suppo...

Posted by mattyb149 <gi...@git.apache.org>.
Github user mattyb149 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/271#discussion_r56446675
  
    --- Diff: nifi-nar-bundles/nifi-spring-bundle/nifi-spring-processors/src/main/java/org/apache/nifi/spring/SpringContextProcessor.java ---
    @@ -0,0 +1,392 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.spring;
    +
    +import java.io.File;
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.io.OutputStream;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Map.Entry;
    +import java.util.Set;
    +import java.util.concurrent.TimeUnit;
    +
    +import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.annotation.lifecycle.OnStopped;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.ValidationContext;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Processor;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.io.InputStreamCallback;
    +import org.apache.nifi.processor.io.OutputStreamCallback;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.spring.SpringDataExchanger.SpringResponse;
    +import org.apache.nifi.stream.io.StreamUtils;
    +import org.apache.nifi.util.FormatUtils;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +import org.springframework.messaging.Message;
    +import org.springframework.messaging.MessageChannel;
    +import org.springframework.messaging.PollableChannel;
    +
    +/**
    + * Implementation of {@link Processor} capable of sending and receiving data
    + * from application defined in Spring Application context. It does so via
    + * predefined in/out {@link MessageChannel}s (see spring-messaging module of
    + * Spring). Once such channels are defined user is free to implement the rest of
    + * the application any way they wish (e.g., custom code and/or using frameworks
    + * such as Spring Integration or Camel).
    + * <p>
    + * The requirement and expectations for channel types are:
    + * <ul>
    + * <li>Input channel must be of type {@link MessageChannel} and named "fromNiFi"
    + * (see {@link SpringNiFiConstants#FROM_NIFI})</li>
    + * <li>Output channel must be of type {@link PollableChannel} and named "toNiFi"
    + * (see {@link SpringNiFiConstants#TO_NIFI})</li>
    + * </ul>
    + * </p>
    + * Below is the example of sample configuration:
    + *
    + * <pre>
    + * &lt;?xml version="1.0" encoding="UTF-8"?&gt;
    + * &lt;beans xmlns="http://www.springframework.org/schema/beans"
    + *   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    + *   xmlns:int="http://www.springframework.org/schema/integration"
    + *  xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
    + *      http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration-4.2.xsd"&gt;
    + *
    + *  &lt;int:channel id="fromNiFi"/&gt;
    + *
    + *  . . . . .
    + *
    + *  &lt;int:channel id="toNiFi"&gt;
    + *      &lt;int:queue/&gt;
    + *  &lt;/int:channel&gt;
    + *
    + * &lt;/beans&gt;
    + * </pre>
    + * <p>
    + * Defining {@link MessageChannel} is optional. That's why this processor
    + * supports 3 modes of interaction with Spring Application Context:
    + * <ul>
    + * <li>Headless – no channels are defined therefore nothing is sent to or
    + * received from such Application Contexts (i.e., some monitoring app).</li>
    + * <li>One way (NiFi -&gt; Spring or Spring -&gt; NiFi) - depends on existence
    + * of one of "fromNiFi" or "toNiFi" channel in the Spring Application Context.
    + * </li>
    + * <li>Bi-directional (NiFi -&gt; Spring -&gt; Nifi or Spring -&gt; NiFi -&gt;
    + * Spring) - depends on existence of both "fromNiFi" and "toNiFi" channels in
    + * the Spring Application Context</li>
    + * </ul>
    + *
    + * </p>
    + * <p>
    + * To create an instance of the ApplicationConetxt this processor requires user
    + * to provide configuration file path and the path to the resources that needs
    + * to be added to the classpath of ApplicationContext. This essentially allows
    + * user to package their Spring Application any way they want as long as
    + * everything it requires is available on the classpath.
    + * </p>
    + * <p>
    + * Data exchange between Spring and NiFi relies on simple mechanism which is
    + * exposed via {@link SpringDataExchanger}; {@link FlowFile}s's content is
    + * converted to primitive representation that can be easily wrapped in Spring
    + * {@link Message}. The requirement imposed by this Processor is to send/receive
    + * {@link Message} with <i>payload</i> of type <i>byte[]</i> and headers of type
    + * <i>Map&lt;String, Object&gt;</i>. This is primarily for simplicity and type
    + * safety. Converters and Transformers could be used by either side to change
    + * representation of the content that is being exchanged between NiFi and
    + * Spring.
    + */
    +@TriggerWhenEmpty
    +@Tags({ "Spring", "Message", "Get", "Put", "Integration" })
    +@CapabilityDescription("A Processor that supports sending and receiving data from application defined in "
    +        + "Spring Application Context via predefined in/out MessageChannels.")
    +public class SpringContextProcessor extends AbstractProcessor {
    +    private final Logger logger = LoggerFactory.getLogger(SpringContextProcessor.class);
    +
    +    public static final PropertyDescriptor CTX_CONFIG_NAME = new PropertyDescriptor.Builder()
    +            .name("Application Context config path")
    +            .description("The path to the Spring Application Context configuration file relative to the classpath")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    --- End diff --
    
    Sounds like a job for https://issues.apache.org/jira/browse/NIFI-1121 (someday)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-1571 initial commit of SpringContext suppo...

Posted by olegz <gi...@git.apache.org>.
Github user olegz commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/271#discussion_r56596981
  
    --- Diff: nifi-nar-bundles/nifi-spring-bundle/nifi-spring-processors/src/main/java/org/apache/nifi/spring/SpringContextProcessor.java ---
    @@ -0,0 +1,448 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.spring;
    +
    +import java.io.File;
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.io.OutputStream;
    +import java.net.MalformedURLException;
    +import java.net.URL;
    +import java.net.URLClassLoader;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Map.Entry;
    +import java.util.Set;
    +import java.util.concurrent.TimeUnit;
    +
    +import org.apache.commons.lang3.StringUtils;
    +import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.annotation.lifecycle.OnStopped;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.ValidationContext;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Processor;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.io.InputStreamCallback;
    +import org.apache.nifi.processor.io.OutputStreamCallback;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.spring.SpringDataExchanger.SpringResponse;
    +import org.apache.nifi.stream.io.StreamUtils;
    +import org.apache.nifi.util.FormatUtils;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +import org.springframework.messaging.Message;
    +import org.springframework.messaging.MessageChannel;
    +import org.springframework.messaging.PollableChannel;
    +
    +/**
    + * Implementation of {@link Processor} capable of sending and receiving data
    + * from application defined in Spring Application context. It does so via
    + * predefined in/out {@link MessageChannel}s (see spring-messaging module of
    + * Spring). Once such channels are defined user is free to implement the rest of
    + * the application any way they wish (e.g., custom code and/or using frameworks
    + * such as Spring Integration or Camel).
    + * <p>
    + * The requirement and expectations for channel types are:
    + * <ul>
    + * <li>Input channel must be of type {@link MessageChannel} and named "fromNiFi"
    + * (see {@link SpringNiFiConstants#FROM_NIFI})</li>
    + * <li>Output channel must be of type {@link PollableChannel} and named "toNiFi"
    + * (see {@link SpringNiFiConstants#TO_NIFI})</li>
    + * </ul>
    + * </p>
    + * Below is the example of sample configuration:
    + *
    + * <pre>
    + * &lt;?xml version="1.0" encoding="UTF-8"?&gt;
    + * &lt;beans xmlns="http://www.springframework.org/schema/beans"
    + *   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    + *   xmlns:int="http://www.springframework.org/schema/integration"
    + *  xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
    + *      http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration-4.2.xsd"&gt;
    + *
    + *  &lt;int:channel id="fromNiFi"/&gt;
    + *
    + *  . . . . .
    + *
    + *  &lt;int:channel id="toNiFi"&gt;
    + *      &lt;int:queue/&gt;
    + *  &lt;/int:channel&gt;
    + *
    + * &lt;/beans&gt;
    + * </pre>
    + * <p>
    + * Defining {@link MessageChannel} is optional. That's why this processor
    + * supports 3 modes of interaction with Spring Application Context:
    + * <ul>
    + * <li>Headless – no channels are defined therefore nothing is sent to or
    + * received from such Application Contexts (i.e., some monitoring app).</li>
    + * <li>One way (NiFi -&gt; Spring or Spring -&gt; NiFi) - depends on existence
    + * of one of "fromNiFi" or "toNiFi" channel in the Spring Application Context.
    + * </li>
    + * <li>Bi-directional (NiFi -&gt; Spring -&gt; Nifi or Spring -&gt; NiFi -&gt;
    + * Spring) - depends on existence of both "fromNiFi" and "toNiFi" channels in
    + * the Spring Application Context</li>
    + * </ul>
    + *
    + * </p>
    + * <p>
    + * To create an instance of the ApplicationConetxt this processor requires user
    + * to provide configuration file path and the path to the resources that needs
    + * to be added to the classpath of ApplicationContext. This essentially allows
    + * user to package their Spring Application any way they want as long as
    + * everything it requires is available on the classpath.
    + * </p>
    + * <p>
    + * Data exchange between Spring and NiFi relies on simple mechanism which is
    + * exposed via {@link SpringDataExchanger}; {@link FlowFile}s's content is
    + * converted to primitive representation that can be easily wrapped in Spring
    + * {@link Message}. The requirement imposed by this Processor is to send/receive
    + * {@link Message} with <i>payload</i> of type <i>byte[]</i> and headers of type
    + * <i>Map&lt;String, Object&gt;</i>. This is primarily for simplicity and type
    + * safety. Converters and Transformers could be used by either side to change
    + * representation of the content that is being exchanged between NiFi and
    + * Spring.
    + */
    +@TriggerWhenEmpty
    +@Tags({ "Spring", "Message", "Get", "Put", "Integration" })
    +@CapabilityDescription("A Processor that supports sending and receiving data from application defined in "
    +        + "Spring Application Context via predefined in/out MessageChannels.")
    +public class SpringContextProcessor extends AbstractProcessor {
    +    private final Logger logger = LoggerFactory.getLogger(SpringContextProcessor.class);
    +
    +    public static final PropertyDescriptor CTX_CONFIG_PATH = new PropertyDescriptor.Builder()
    +            .name("Application Context config path")
    +            .description("The path to the Spring Application Context configuration file relative to the classpath")
    +            .required(true)
    +.addValidator(new SpringContextConfigValidator())
    --- End diff --
    
    Ok, just addressed the white space comments and this one. Let me know when you're done so I can make one last push (hopefully).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-1571 initial commit of SpringContext suppo...

Posted by olegz <gi...@git.apache.org>.
Github user olegz commented on the pull request:

    https://github.com/apache/nifi/pull/271#issuecomment-197121747
  
    @trkurc Without knowing how much spring experience you have it's hard to say, but I do have a sample app which I use for testing that I can push to my github and you (anyone) can use to play around. Will do it tomorrow after some other pressing things. Also, I plan to add one minor feature. Will describe it in the next commit message as well as updated docs.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-1571 initial commit of SpringContext suppo...

Posted by olegz <gi...@git.apache.org>.
Github user olegz commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/271#discussion_r56446641
  
    --- Diff: nifi-nar-bundles/nifi-spring-bundle/nifi-spring-processors/src/main/java/org/apache/nifi/spring/SpringContextProcessor.java ---
    @@ -0,0 +1,392 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.spring;
    +
    +import java.io.File;
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.io.OutputStream;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Map.Entry;
    +import java.util.Set;
    +import java.util.concurrent.TimeUnit;
    +
    +import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.annotation.lifecycle.OnStopped;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.ValidationContext;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Processor;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.io.InputStreamCallback;
    +import org.apache.nifi.processor.io.OutputStreamCallback;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.spring.SpringDataExchanger.SpringResponse;
    +import org.apache.nifi.stream.io.StreamUtils;
    +import org.apache.nifi.util.FormatUtils;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +import org.springframework.messaging.Message;
    +import org.springframework.messaging.MessageChannel;
    +import org.springframework.messaging.PollableChannel;
    +
    +/**
    + * Implementation of {@link Processor} capable of sending and receiving data
    + * from application defined in Spring Application context. It does so via
    + * predefined in/out {@link MessageChannel}s (see spring-messaging module of
    + * Spring). Once such channels are defined user is free to implement the rest of
    + * the application any way they wish (e.g., custom code and/or using frameworks
    + * such as Spring Integration or Camel).
    + * <p>
    + * The requirement and expectations for channel types are:
    + * <ul>
    + * <li>Input channel must be of type {@link MessageChannel} and named "fromNiFi"
    + * (see {@link SpringNiFiConstants#FROM_NIFI})</li>
    + * <li>Output channel must be of type {@link PollableChannel} and named "toNiFi"
    + * (see {@link SpringNiFiConstants#TO_NIFI})</li>
    + * </ul>
    + * </p>
    + * Below is the example of sample configuration:
    + *
    + * <pre>
    + * &lt;?xml version="1.0" encoding="UTF-8"?&gt;
    + * &lt;beans xmlns="http://www.springframework.org/schema/beans"
    + *   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    + *   xmlns:int="http://www.springframework.org/schema/integration"
    + *  xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
    + *      http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration-4.2.xsd"&gt;
    + *
    + *  &lt;int:channel id="fromNiFi"/&gt;
    + *
    + *  . . . . .
    + *
    + *  &lt;int:channel id="toNiFi"&gt;
    + *      &lt;int:queue/&gt;
    + *  &lt;/int:channel&gt;
    + *
    + * &lt;/beans&gt;
    + * </pre>
    + * <p>
    + * Defining {@link MessageChannel} is optional. That's why this processor
    + * supports 3 modes of interaction with Spring Application Context:
    + * <ul>
    + * <li>Headless – no channels are defined therefore nothing is sent to or
    + * received from such Application Contexts (i.e., some monitoring app).</li>
    + * <li>One way (NiFi -&gt; Spring or Spring -&gt; NiFi) - depends on existence
    + * of one of "fromNiFi" or "toNiFi" channel in the Spring Application Context.
    + * </li>
    + * <li>Bi-directional (NiFi -&gt; Spring -&gt; Nifi or Spring -&gt; NiFi -&gt;
    + * Spring) - depends on existence of both "fromNiFi" and "toNiFi" channels in
    + * the Spring Application Context</li>
    + * </ul>
    + *
    + * </p>
    + * <p>
    + * To create an instance of the ApplicationConetxt this processor requires user
    + * to provide configuration file path and the path to the resources that needs
    + * to be added to the classpath of ApplicationContext. This essentially allows
    + * user to package their Spring Application any way they want as long as
    + * everything it requires is available on the classpath.
    + * </p>
    + * <p>
    + * Data exchange between Spring and NiFi relies on simple mechanism which is
    + * exposed via {@link SpringDataExchanger}; {@link FlowFile}s's content is
    + * converted to primitive representation that can be easily wrapped in Spring
    + * {@link Message}. The requirement imposed by this Processor is to send/receive
    + * {@link Message} with <i>payload</i> of type <i>byte[]</i> and headers of type
    + * <i>Map&lt;String, Object&gt;</i>. This is primarily for simplicity and type
    + * safety. Converters and Transformers could be used by either side to change
    + * representation of the content that is being exchanged between NiFi and
    + * Spring.
    + */
    +@TriggerWhenEmpty
    +@Tags({ "Spring", "Message", "Get", "Put", "Integration" })
    +@CapabilityDescription("A Processor that supports sending and receiving data from application defined in "
    +        + "Spring Application Context via predefined in/out MessageChannels.")
    +public class SpringContextProcessor extends AbstractProcessor {
    +    private final Logger logger = LoggerFactory.getLogger(SpringContextProcessor.class);
    +
    +    public static final PropertyDescriptor CTX_CONFIG_NAME = new PropertyDescriptor.Builder()
    +            .name("Application Context config path")
    +            .description("The path to the Spring Application Context configuration file relative to the classpath")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    --- End diff --
    
    Even if we were to switch the order of the properties, unless it could be enforced there is no joy. So I say leave it and let it fail at the start.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-1571 initial commit of SpringContext suppo...

Posted by olegz <gi...@git.apache.org>.
Github user olegz commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/271#discussion_r56508487
  
    --- Diff: nifi-nar-bundles/nifi-spring-bundle/nifi-spring-processors/src/main/java/org/apache/nifi/spring/SpringContextProcessor.java ---
    @@ -0,0 +1,392 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.spring;
    +
    +import java.io.File;
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.io.OutputStream;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Map.Entry;
    +import java.util.Set;
    +import java.util.concurrent.TimeUnit;
    +
    +import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.annotation.lifecycle.OnStopped;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.ValidationContext;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Processor;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.io.InputStreamCallback;
    +import org.apache.nifi.processor.io.OutputStreamCallback;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.spring.SpringDataExchanger.SpringResponse;
    +import org.apache.nifi.stream.io.StreamUtils;
    +import org.apache.nifi.util.FormatUtils;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +import org.springframework.messaging.Message;
    +import org.springframework.messaging.MessageChannel;
    +import org.springframework.messaging.PollableChannel;
    +
    +/**
    + * Implementation of {@link Processor} capable of sending and receiving data
    + * from application defined in Spring Application context. It does so via
    + * predefined in/out {@link MessageChannel}s (see spring-messaging module of
    + * Spring). Once such channels are defined user is free to implement the rest of
    + * the application any way they wish (e.g., custom code and/or using frameworks
    + * such as Spring Integration or Camel).
    + * <p>
    + * The requirement and expectations for channel types are:
    + * <ul>
    + * <li>Input channel must be of type {@link MessageChannel} and named "fromNiFi"
    + * (see {@link SpringNiFiConstants#FROM_NIFI})</li>
    + * <li>Output channel must be of type {@link PollableChannel} and named "toNiFi"
    + * (see {@link SpringNiFiConstants#TO_NIFI})</li>
    + * </ul>
    + * </p>
    + * Below is the example of sample configuration:
    + *
    + * <pre>
    + * &lt;?xml version="1.0" encoding="UTF-8"?&gt;
    + * &lt;beans xmlns="http://www.springframework.org/schema/beans"
    + *   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    + *   xmlns:int="http://www.springframework.org/schema/integration"
    + *  xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
    + *      http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration-4.2.xsd"&gt;
    + *
    + *  &lt;int:channel id="fromNiFi"/&gt;
    + *
    + *  . . . . .
    + *
    + *  &lt;int:channel id="toNiFi"&gt;
    + *      &lt;int:queue/&gt;
    + *  &lt;/int:channel&gt;
    + *
    + * &lt;/beans&gt;
    + * </pre>
    + * <p>
    + * Defining {@link MessageChannel} is optional. That's why this processor
    + * supports 3 modes of interaction with Spring Application Context:
    + * <ul>
    + * <li>Headless – no channels are defined therefore nothing is sent to or
    + * received from such Application Contexts (i.e., some monitoring app).</li>
    + * <li>One way (NiFi -&gt; Spring or Spring -&gt; NiFi) - depends on existence
    + * of one of "fromNiFi" or "toNiFi" channel in the Spring Application Context.
    + * </li>
    + * <li>Bi-directional (NiFi -&gt; Spring -&gt; Nifi or Spring -&gt; NiFi -&gt;
    + * Spring) - depends on existence of both "fromNiFi" and "toNiFi" channels in
    + * the Spring Application Context</li>
    + * </ul>
    + *
    + * </p>
    + * <p>
    + * To create an instance of the ApplicationConetxt this processor requires user
    + * to provide configuration file path and the path to the resources that needs
    + * to be added to the classpath of ApplicationContext. This essentially allows
    + * user to package their Spring Application any way they want as long as
    + * everything it requires is available on the classpath.
    + * </p>
    + * <p>
    + * Data exchange between Spring and NiFi relies on simple mechanism which is
    + * exposed via {@link SpringDataExchanger}; {@link FlowFile}s's content is
    + * converted to primitive representation that can be easily wrapped in Spring
    + * {@link Message}. The requirement imposed by this Processor is to send/receive
    + * {@link Message} with <i>payload</i> of type <i>byte[]</i> and headers of type
    + * <i>Map&lt;String, Object&gt;</i>. This is primarily for simplicity and type
    + * safety. Converters and Transformers could be used by either side to change
    + * representation of the content that is being exchanged between NiFi and
    + * Spring.
    + */
    +@TriggerWhenEmpty
    +@Tags({ "Spring", "Message", "Get", "Put", "Integration" })
    +@CapabilityDescription("A Processor that supports sending and receiving data from application defined in "
    +        + "Spring Application Context via predefined in/out MessageChannels.")
    +public class SpringContextProcessor extends AbstractProcessor {
    +    private final Logger logger = LoggerFactory.getLogger(SpringContextProcessor.class);
    +
    +    public static final PropertyDescriptor CTX_CONFIG_NAME = new PropertyDescriptor.Builder()
    +            .name("Application Context config path")
    +            .description("The path to the Spring Application Context configuration file relative to the classpath")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    --- End diff --
    
    @trkurc I hate to say no as I do believe it would be valuable, so I may have found a way that is relatively easy. Stay tuned.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-1571 initial commit of SpringContext suppo...

Posted by trkurc <gi...@git.apache.org>.
Github user trkurc commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/271#discussion_r56104123
  
    --- Diff: nifi-nar-bundles/nifi-spring-bundle/nifi-spring-processors/src/main/java/org/apache/nifi/spring/SpringContextFactory.java ---
    @@ -0,0 +1,139 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.spring;
    +
    +import java.io.File;
    +import java.io.InputStream;
    +import java.lang.reflect.Constructor;
    +import java.net.URL;
    +import java.net.URLClassLoader;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.List;
    +
    +import org.apache.commons.io.IOUtils;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +/**
    + * Helper class which provides factory method to create and initialize Spring
    + * Application Context while scoping it within the dedicated Class Loader.
    + */
    +final class SpringContextFactory {
    +
    +    private static final Logger logger = LoggerFactory.getLogger(SpringContextFactory.class);
    +
    +    private static final String SC_DELEGATE_NAME = "org.apache.nifi.spring.bootstrap.SpringContextDelegate";
    +
    +    /**
    +     * Creates and instance of Spring Application Context scoped within a
    +     * dedicated Class Loader
    +     */
    +    static SpringDataExchanger createSpringContextDelegate(String classpath, String config) {
    +        System.out.println(SpringContextFactory.class.getClassLoader());
    +        URL[] urls = gatherAdditionalClassPathUrls(classpath);
    +        SpringContextClassLoader contextCl = new SpringContextClassLoader(urls,
    +                SpringContextFactory.class.getClassLoader());
    +        try {
    +            InputStream delegateStream = contextCl.getResourceAsStream(SC_DELEGATE_NAME.replace('.', '/') + ".class");
    +            byte[] delegateBytes = IOUtils.toByteArray(delegateStream);
    +            Class<?> clazz = contextCl.doDefineClass(SC_DELEGATE_NAME, delegateBytes, 0, delegateBytes.length);
    +            Constructor<?> ctr = clazz.getDeclaredConstructor(String.class);
    +            ctr.setAccessible(true);
    +            SpringDataExchanger springDelegate = (SpringDataExchanger) ctr.newInstance(config);
    +            if (logger.isInfoEnabled()) {
    +                logger.info("Successfully instantiated Spring Application Context from '" + config + "'");
    +            }
    +            return springDelegate;
    +        } catch (Exception e) {
    +            try {
    +                contextCl.close();
    +            } catch (Exception e2) {
    +                // ignore
    +            }
    +            throw new IllegalStateException("Failed to instantiate Spring Application Context. Config path: '" + config
    +                    + "'; Classpath: " + Arrays.asList(urls), e);
    +        }
    +    }
    +
    +    /**
    +     *
    +     */
    +    private static URL[] gatherAdditionalClassPathUrls(String path) {
    +        if (logger.isDebugEnabled()) {
    +            logger.debug("Adding additional resources from '" + path + "' to the classpath.");
    +        }
    +        File libraryDir = new File(path);
    +        if (libraryDir.exists() && libraryDir.isDirectory()) {
    +            String[] cpResourceNames = libraryDir.list();
    +            try {
    +                URLClassLoader thisCl = (URLClassLoader) SpringContextFactory.class.getClassLoader();
    +                List<URL> urls = new ArrayList<>();
    +                for (int i = 0; i < cpResourceNames.length; i++) {
    +                    if (!isDuplicate(thisCl.getURLs(), cpResourceNames[i])) {
    +                        URL url = new File(libraryDir, cpResourceNames[i]).toURI().toURL();
    +                        urls.add(url);
    +                        if (logger.isDebugEnabled()) {
    +                            logger.debug("Identifying additional resource to the classpath: " + url);
    +                        }
    +                    }
    +                }
    +                return urls.toArray(new URL[] {});
    +            } catch (Exception e) {
    +                throw new IllegalStateException(
    +                        "Failed to parse user libraries from '" + libraryDir.getAbsolutePath() + "'", e);
    +            }
    +        } else {
    +            throw new IllegalArgumentException("Path '" + libraryDir.getAbsolutePath()
    +                    + "' is not valid because it doesn't exist or does not point to a directory.");
    +        }
    +    }
    +
    +    /**
    +     *
    +     */
    +    static boolean isDuplicate(URL[] currentURLs, String resourceName) {
    +        if (resourceName.startsWith("spring")) {
    +            resourceName = resourceName.substring(0, resourceName.lastIndexOf("-"));
    +        }
    +        for (URL eURL : currentURLs) {
    +            if (eURL.getPath().contains(resourceName)) {
    --- End diff --
    
    I tried reasoning about this for a bit, but couldn't you report duplicates that are not actual duplicates if you have an resourceName that is just an unfortunate substring of a url? Or is this very unlikely to occur?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-1571 initial commit of SpringContext suppo...

Posted by trkurc <gi...@git.apache.org>.
Github user trkurc commented on the pull request:

    https://github.com/apache/nifi/pull/271#issuecomment-198171253
  
    @olegz, I think @joewitt is still looking, but I think the code looks good, I put it through the paces using several applications. I'm +1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-1571 initial commit of SpringContext suppo...

Posted by trkurc <gi...@git.apache.org>.
Github user trkurc commented on the pull request:

    https://github.com/apache/nifi/pull/271#issuecomment-197119997
  
    @oleg: changes look good. do you have a recommendation for building a sample "application" for testing other than adapting what is in the unit tests?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-1571 initial commit of SpringContext suppo...

Posted by trkurc <gi...@git.apache.org>.
Github user trkurc commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/271#discussion_r56440539
  
    --- Diff: nifi-nar-bundles/nifi-spring-bundle/nifi-spring-processors/src/main/java/org/apache/nifi/spring/SpringContextProcessor.java ---
    @@ -0,0 +1,392 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.spring;
    +
    +import java.io.File;
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.io.OutputStream;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Map.Entry;
    +import java.util.Set;
    +import java.util.concurrent.TimeUnit;
    +
    +import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.annotation.lifecycle.OnStopped;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.ValidationContext;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Processor;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.io.InputStreamCallback;
    +import org.apache.nifi.processor.io.OutputStreamCallback;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.spring.SpringDataExchanger.SpringResponse;
    +import org.apache.nifi.stream.io.StreamUtils;
    +import org.apache.nifi.util.FormatUtils;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +import org.springframework.messaging.Message;
    +import org.springframework.messaging.MessageChannel;
    +import org.springframework.messaging.PollableChannel;
    +
    +/**
    + * Implementation of {@link Processor} capable of sending and receiving data
    + * from application defined in Spring Application context. It does so via
    + * predefined in/out {@link MessageChannel}s (see spring-messaging module of
    + * Spring). Once such channels are defined user is free to implement the rest of
    + * the application any way they wish (e.g., custom code and/or using frameworks
    + * such as Spring Integration or Camel).
    + * <p>
    + * The requirement and expectations for channel types are:
    + * <ul>
    + * <li>Input channel must be of type {@link MessageChannel} and named "fromNiFi"
    + * (see {@link SpringNiFiConstants#FROM_NIFI})</li>
    + * <li>Output channel must be of type {@link PollableChannel} and named "toNiFi"
    + * (see {@link SpringNiFiConstants#TO_NIFI})</li>
    + * </ul>
    + * </p>
    + * Below is the example of sample configuration:
    + *
    + * <pre>
    + * &lt;?xml version="1.0" encoding="UTF-8"?&gt;
    + * &lt;beans xmlns="http://www.springframework.org/schema/beans"
    + *   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    + *   xmlns:int="http://www.springframework.org/schema/integration"
    + *  xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
    + *      http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration-4.2.xsd"&gt;
    + *
    + *  &lt;int:channel id="fromNiFi"/&gt;
    + *
    + *  . . . . .
    + *
    + *  &lt;int:channel id="toNiFi"&gt;
    + *      &lt;int:queue/&gt;
    + *  &lt;/int:channel&gt;
    + *
    + * &lt;/beans&gt;
    + * </pre>
    + * <p>
    + * Defining {@link MessageChannel} is optional. That's why this processor
    + * supports 3 modes of interaction with Spring Application Context:
    + * <ul>
    + * <li>Headless – no channels are defined therefore nothing is sent to or
    + * received from such Application Contexts (i.e., some monitoring app).</li>
    + * <li>One way (NiFi -&gt; Spring or Spring -&gt; NiFi) - depends on existence
    + * of one of "fromNiFi" or "toNiFi" channel in the Spring Application Context.
    + * </li>
    + * <li>Bi-directional (NiFi -&gt; Spring -&gt; Nifi or Spring -&gt; NiFi -&gt;
    + * Spring) - depends on existence of both "fromNiFi" and "toNiFi" channels in
    + * the Spring Application Context</li>
    + * </ul>
    + *
    + * </p>
    + * <p>
    + * To create an instance of the ApplicationConetxt this processor requires user
    + * to provide configuration file path and the path to the resources that needs
    + * to be added to the classpath of ApplicationContext. This essentially allows
    + * user to package their Spring Application any way they want as long as
    + * everything it requires is available on the classpath.
    + * </p>
    + * <p>
    + * Data exchange between Spring and NiFi relies on simple mechanism which is
    + * exposed via {@link SpringDataExchanger}; {@link FlowFile}s's content is
    + * converted to primitive representation that can be easily wrapped in Spring
    + * {@link Message}. The requirement imposed by this Processor is to send/receive
    + * {@link Message} with <i>payload</i> of type <i>byte[]</i> and headers of type
    + * <i>Map&lt;String, Object&gt;</i>. This is primarily for simplicity and type
    + * safety. Converters and Transformers could be used by either side to change
    + * representation of the content that is being exchanged between NiFi and
    + * Spring.
    + */
    +@TriggerWhenEmpty
    +@Tags({ "Spring", "Message", "Get", "Put", "Integration" })
    +@CapabilityDescription("A Processor that supports sending and receiving data from application defined in "
    +        + "Spring Application Context via predefined in/out MessageChannels.")
    +public class SpringContextProcessor extends AbstractProcessor {
    +    private final Logger logger = LoggerFactory.getLogger(SpringContextProcessor.class);
    +
    +    public static final PropertyDescriptor CTX_CONFIG_NAME = new PropertyDescriptor.Builder()
    +            .name("Application Context config path")
    +            .description("The path to the Spring Application Context configuration file relative to the classpath")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    --- End diff --
    
    may be helpful to add a validator that this file exists?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-1571 initial commit of SpringContext suppo...

Posted by trkurc <gi...@git.apache.org>.
Github user trkurc commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/271#discussion_r56105879
  
    --- Diff: nifi-nar-bundles/nifi-spring-bundle/nifi-spring-processors/src/main/resources/docs/org.apache.nifi.spring.SpringContextProcessor/additionalDetails.html ---
    @@ -0,0 +1,94 @@
    +<!DOCTYPE html>
    +<html lang="en">
    +    <!--
    +      Licensed to the Apache Software Foundation (ASF) under one or more
    +      contributor license agreements.  See the NOTICE file distributed with
    +      this work for additional information regarding copyright ownership.
    +      The ASF licenses this file to You under the Apache License, Version 2.0
    +      (the "License"); you may not use this file except in compliance with
    +      the License.  You may obtain a copy of the License at
    +          http://www.apache.org/licenses/LICENSE-2.0
    +      Unless required by applicable law or agreed to in writing, software
    +      distributed under the License is distributed on an "AS IS" BASIS,
    +      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    +      See the License for the specific language governing permissions and
    +      limitations under the License.
    +    -->
    +    <head>
    +        <meta charset="utf-8" />
    +        <title>SpringContextProcessor</title>
    +        <link rel="stylesheet" href="../../css/component-usage.css" type="text/css" />
    +    </head>
    +
    +    <body>
    +        <!-- Processor Documentation ================================================== -->
    +        <h2>Description:</h2>
    +        <p>
    +            <b>SpringContextProcessor</b> – allows integration of processes encapsulated in Spring Application Context to run as NiFi 
    +            processor by becoming a runtime host for an instance of Spring Application Context.  
    +        </p>
    +        <p>
    +            Communication between NiFi and process encapsulated within Spring Application Context is accomplished via Spring Messaging 
    +            (one of the core modules of Spring Framework) and supports 3 usage modes:
    +            <ul>
    +            	<li><i>Headless</i> - no interaction with NiFi, meaning nothing is sent to it and nothing is received from it (i.e., some monitoring app).  
    +            	In this case NiFi simply plays the role of the runtime host.</li>
    +            	<li><i>One way (NiFi -&gt; Spring or Spring -&gt; NiFi). </i> - This depends on existence of pre-defined message channel in Spring 
    +            	Application Context. The name of the channel should be “fromNiFi” and the type <i>org.springframework.messaging.MessageChannel.</i></li>
    +            	<li><i>By-directional (NiFi -&gt; Spring -&gt; Nifi or Spring -&gt; NiFi -&gt; Spring)</i> - This depends on existence of two channels  
    +            	in Spring Application Context. One channel receives messages from NiFi with name “fromNiFi” and type <i>org.springframework.messaging.MessageChannel</i>i>
    +            	 and another is o receive messages from Spring with name “toNiFi” and type <i>org.springframework.messaging.PollableChannel.</i></li>
    --- End diff --
    
    another typo: s/ o / to / ?



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-1571 initial commit of SpringContext suppo...

Posted by trkurc <gi...@git.apache.org>.
Github user trkurc commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/271#discussion_r56104556
  
    --- Diff: nifi-nar-bundles/nifi-spring-bundle/nifi-spring-processors/src/main/java/org/apache/nifi/spring/SpringContextFactory.java ---
    @@ -0,0 +1,139 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.spring;
    +
    +import java.io.File;
    +import java.io.InputStream;
    +import java.lang.reflect.Constructor;
    +import java.net.URL;
    +import java.net.URLClassLoader;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.List;
    +
    +import org.apache.commons.io.IOUtils;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +/**
    + * Helper class which provides factory method to create and initialize Spring
    + * Application Context while scoping it within the dedicated Class Loader.
    + */
    +final class SpringContextFactory {
    +
    +    private static final Logger logger = LoggerFactory.getLogger(SpringContextFactory.class);
    +
    +    private static final String SC_DELEGATE_NAME = "org.apache.nifi.spring.bootstrap.SpringContextDelegate";
    +
    +    /**
    +     * Creates and instance of Spring Application Context scoped within a
    +     * dedicated Class Loader
    +     */
    +    static SpringDataExchanger createSpringContextDelegate(String classpath, String config) {
    +        System.out.println(SpringContextFactory.class.getClassLoader());
    +        URL[] urls = gatherAdditionalClassPathUrls(classpath);
    +        SpringContextClassLoader contextCl = new SpringContextClassLoader(urls,
    +                SpringContextFactory.class.getClassLoader());
    +        try {
    +            InputStream delegateStream = contextCl.getResourceAsStream(SC_DELEGATE_NAME.replace('.', '/') + ".class");
    +            byte[] delegateBytes = IOUtils.toByteArray(delegateStream);
    +            Class<?> clazz = contextCl.doDefineClass(SC_DELEGATE_NAME, delegateBytes, 0, delegateBytes.length);
    +            Constructor<?> ctr = clazz.getDeclaredConstructor(String.class);
    +            ctr.setAccessible(true);
    +            SpringDataExchanger springDelegate = (SpringDataExchanger) ctr.newInstance(config);
    +            if (logger.isInfoEnabled()) {
    +                logger.info("Successfully instantiated Spring Application Context from '" + config + "'");
    +            }
    +            return springDelegate;
    +        } catch (Exception e) {
    +            try {
    +                contextCl.close();
    +            } catch (Exception e2) {
    +                // ignore
    +            }
    +            throw new IllegalStateException("Failed to instantiate Spring Application Context. Config path: '" + config
    +                    + "'; Classpath: " + Arrays.asList(urls), e);
    +        }
    +    }
    +
    +    /**
    +     *
    +     */
    +    private static URL[] gatherAdditionalClassPathUrls(String path) {
    +        if (logger.isDebugEnabled()) {
    +            logger.debug("Adding additional resources from '" + path + "' to the classpath.");
    +        }
    +        File libraryDir = new File(path);
    +        if (libraryDir.exists() && libraryDir.isDirectory()) {
    +            String[] cpResourceNames = libraryDir.list();
    +            try {
    +                URLClassLoader thisCl = (URLClassLoader) SpringContextFactory.class.getClassLoader();
    +                List<URL> urls = new ArrayList<>();
    +                for (int i = 0; i < cpResourceNames.length; i++) {
    +                    if (!isDuplicate(thisCl.getURLs(), cpResourceNames[i])) {
    +                        URL url = new File(libraryDir, cpResourceNames[i]).toURI().toURL();
    +                        urls.add(url);
    +                        if (logger.isDebugEnabled()) {
    +                            logger.debug("Identifying additional resource to the classpath: " + url);
    +                        }
    +                    }
    +                }
    +                return urls.toArray(new URL[] {});
    +            } catch (Exception e) {
    +                throw new IllegalStateException(
    +                        "Failed to parse user libraries from '" + libraryDir.getAbsolutePath() + "'", e);
    +            }
    +        } else {
    +            throw new IllegalArgumentException("Path '" + libraryDir.getAbsolutePath()
    +                    + "' is not valid because it doesn't exist or does not point to a directory.");
    +        }
    +    }
    +
    +    /**
    +     *
    +     */
    +    static boolean isDuplicate(URL[] currentURLs, String resourceName) {
    +        if (resourceName.startsWith("spring")) {
    +            resourceName = resourceName.substring(0, resourceName.lastIndexOf("-"));
    --- End diff --
    
    not sure I read the code flow correct, but if I have a jar in my application class path named springtony.jar, will this blow up?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-1571 initial commit of SpringContext suppo...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/nifi/pull/271


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-1571 initial commit of SpringContext suppo...

Posted by trkurc <gi...@git.apache.org>.
Github user trkurc commented on the pull request:

    https://github.com/apache/nifi/pull/271#issuecomment-198134497
  
    @olegz - there are a bunch of trailing whitespace and whitespace on blank lines in the xml and properties files. checkstyle doesn't complain as it checks the java code only, but it did cause warnings when I applied the patch


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-1571 initial commit of SpringContext suppo...

Posted by olegz <gi...@git.apache.org>.
Github user olegz commented on the pull request:

    https://github.com/apache/nifi/pull/271#issuecomment-197434936
  
    @trkurc you can use the simple app here: https://github.com/olegz/si-demo. It has some initial docs and is ready to be used as is after 'mvn clean install', but you can also use it as template and change code.
    Initial docs are in its README


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-1571 initial commit of SpringContext suppo...

Posted by olegz <gi...@git.apache.org>.
Github user olegz commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/271#discussion_r56106464
  
    --- Diff: nifi-nar-bundles/nifi-spring-bundle/nifi-spring-processors/src/main/java/org/apache/nifi/spring/SpringContextFactory.java ---
    @@ -0,0 +1,139 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.spring;
    +
    +import java.io.File;
    +import java.io.InputStream;
    +import java.lang.reflect.Constructor;
    +import java.net.URL;
    +import java.net.URLClassLoader;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.List;
    +
    +import org.apache.commons.io.IOUtils;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +/**
    + * Helper class which provides factory method to create and initialize Spring
    + * Application Context while scoping it within the dedicated Class Loader.
    + */
    +final class SpringContextFactory {
    +
    +    private static final Logger logger = LoggerFactory.getLogger(SpringContextFactory.class);
    +
    +    private static final String SC_DELEGATE_NAME = "org.apache.nifi.spring.bootstrap.SpringContextDelegate";
    +
    +    /**
    +     * Creates and instance of Spring Application Context scoped within a
    +     * dedicated Class Loader
    +     */
    +    static SpringDataExchanger createSpringContextDelegate(String classpath, String config) {
    +        System.out.println(SpringContextFactory.class.getClassLoader());
    +        URL[] urls = gatherAdditionalClassPathUrls(classpath);
    +        SpringContextClassLoader contextCl = new SpringContextClassLoader(urls,
    +                SpringContextFactory.class.getClassLoader());
    +        try {
    +            InputStream delegateStream = contextCl.getResourceAsStream(SC_DELEGATE_NAME.replace('.', '/') + ".class");
    +            byte[] delegateBytes = IOUtils.toByteArray(delegateStream);
    +            Class<?> clazz = contextCl.doDefineClass(SC_DELEGATE_NAME, delegateBytes, 0, delegateBytes.length);
    +            Constructor<?> ctr = clazz.getDeclaredConstructor(String.class);
    +            ctr.setAccessible(true);
    +            SpringDataExchanger springDelegate = (SpringDataExchanger) ctr.newInstance(config);
    +            if (logger.isInfoEnabled()) {
    +                logger.info("Successfully instantiated Spring Application Context from '" + config + "'");
    +            }
    +            return springDelegate;
    +        } catch (Exception e) {
    +            try {
    +                contextCl.close();
    +            } catch (Exception e2) {
    +                // ignore
    +            }
    +            throw new IllegalStateException("Failed to instantiate Spring Application Context. Config path: '" + config
    +                    + "'; Classpath: " + Arrays.asList(urls), e);
    +        }
    +    }
    +
    +    /**
    +     *
    +     */
    +    private static URL[] gatherAdditionalClassPathUrls(String path) {
    +        if (logger.isDebugEnabled()) {
    +            logger.debug("Adding additional resources from '" + path + "' to the classpath.");
    +        }
    +        File libraryDir = new File(path);
    +        if (libraryDir.exists() && libraryDir.isDirectory()) {
    +            String[] cpResourceNames = libraryDir.list();
    +            try {
    +                URLClassLoader thisCl = (URLClassLoader) SpringContextFactory.class.getClassLoader();
    +                List<URL> urls = new ArrayList<>();
    +                for (int i = 0; i < cpResourceNames.length; i++) {
    +                    if (!isDuplicate(thisCl.getURLs(), cpResourceNames[i])) {
    +                        URL url = new File(libraryDir, cpResourceNames[i]).toURI().toURL();
    +                        urls.add(url);
    +                        if (logger.isDebugEnabled()) {
    +                            logger.debug("Identifying additional resource to the classpath: " + url);
    +                        }
    +                    }
    +                }
    +                return urls.toArray(new URL[] {});
    +            } catch (Exception e) {
    +                throw new IllegalStateException(
    +                        "Failed to parse user libraries from '" + libraryDir.getAbsolutePath() + "'", e);
    +            }
    +        } else {
    +            throw new IllegalArgumentException("Path '" + libraryDir.getAbsolutePath()
    +                    + "' is not valid because it doesn't exist or does not point to a directory.");
    +        }
    +    }
    +
    +    /**
    +     *
    +     */
    +    static boolean isDuplicate(URL[] currentURLs, String resourceName) {
    +        if (resourceName.startsWith("spring")) {
    +            resourceName = resourceName.substring(0, resourceName.lastIndexOf("-"));
    --- End diff --
    
    Let me review this. This whole method will probably go since this NAR no longer packages any Spring JARs. Basically it's now one of the thinnest NARs in NiFi and only comes with 4-5 JARs


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-1571 initial commit of SpringContext suppo...

Posted by olegz <gi...@git.apache.org>.
Github user olegz commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/271#discussion_r56106557
  
    --- Diff: nifi-nar-bundles/nifi-spring-bundle/nifi-spring-processors/src/main/java/org/apache/nifi/spring/SpringContextProcessor.java ---
    @@ -0,0 +1,382 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.spring;
    +
    +import java.io.File;
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.io.OutputStream;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Map.Entry;
    +import java.util.Set;
    +import java.util.concurrent.TimeUnit;
    +
    +import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.annotation.lifecycle.OnStopped;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.ValidationContext;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Processor;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.io.InputStreamCallback;
    +import org.apache.nifi.processor.io.OutputStreamCallback;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.spring.SpringDataExchanger.SpringResponse;
    +import org.apache.nifi.stream.io.StreamUtils;
    +import org.apache.nifi.util.FormatUtils;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +import org.springframework.messaging.Message;
    +import org.springframework.messaging.MessageChannel;
    +import org.springframework.messaging.PollableChannel;
    +
    +/**
    + * Implementation of {@link Processor} capable of sending and receiving data
    + * from application defined in Spring Application context. It does so via
    + * predefined in/out {@link MessageChannel}s (see spring-messaging module of
    + * Spring). Once such channels are defined user is free to implement the rest of
    + * the application any way they wish (e.g., custom code and/or using frameworks
    + * such as Spring Integration or Camel).
    + * <p>
    + * The requirement and expectations for channel types are:
    + * <ul>
    + * <li>Input channel must be of type {@link MessageChannel} and named "fromNiFi"
    + * (see {@link SpringNiFiConstants#FROM_NIFI})</li>
    + * <li>Output channel must be of type {@link PollableChannel} and named "toNiFi"
    + * (see {@link SpringNiFiConstants#TO_NIFI})</li>
    + * </ul>
    + * </p>
    + * Below is the example of sample configuration:
    + *
    + * <pre>
    + * &lt;?xml version="1.0" encoding="UTF-8"?&gt;
    + * &lt;beans xmlns="http://www.springframework.org/schema/beans"
    + *   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    + *   xmlns:int="http://www.springframework.org/schema/integration"
    + *  xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
    + *      http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration-4.2.xsd"&gt;
    + *
    + *  &lt;int:channel id="fromNiFi"/&gt;
    + *
    + *  . . . . .
    + *
    + *  &lt;int:channel id="toNiFi"&gt;
    + *      &lt;int:queue/&gt;
    + *  &lt;/int:channel&gt;
    + *
    + * &lt;/beans&gt;
    + * </pre>
    + * <p>
    + * Defining {@link MessageChannel} is optional. That's why this processor
    + * supports 3 modes of interaction with Spring Application Context:
    + * <ul>
    + * <li>Headless – no channels are defined therefore nothing is sent to or
    + * received from such Application Contexts (i.e., some monitoring app).</li>
    + * <li>One way (NiFi -&gt; Spring or Spring -&gt; NiFi) - depends on existence
    + * of one of "fromNiFi" or "toNiFi" channel in the Spring Application Context.
    + * </li>
    + * <li>Bi-directional (NiFi -&gt; Spring -&gt; Nifi or Spring -&gt; NiFi -&gt;
    + * Spring) - depends on existence of both "fromNiFi" and "toNiFi" channels in
    + * the Spring Application Context</li>
    + * </ul>
    + *
    + * </p>
    + * <p>
    + * To create an instance of the ApplicationConetxt this processor requires user
    + * to provide configuration file path and the path to the resources that needs
    + * to be added to the classpath of ApplicationContext. This essentially allows
    + * user to package their Spring Application any way they want as long as
    + * everything it requires is available on the classpath.
    + * </p>
    + * <p>
    + * Data exchange between Spring and NiFi relies on simple mechanism which is
    + * exposed via {@link SpringDataExchanger}; {@link FlowFile}s's content is
    + * converted to primitive representation that can be easily wrapped in Spring
    + * {@link Message}. The requirement imposed by this Processor is to send/receive
    + * {@link Message} with <i>payload</i> of type <i>byte[]</i> and headers of type
    + * <i>Map&lt;String, Object&gt;</i>. This is primarily for simplicity and type
    + * safety. Converters and Transformers could be used by either side to change
    + * representation of the content that is being exchanged between NiFi and
    + * Spring.
    + */
    +@TriggerWhenEmpty
    +@Tags({ "Spring", "Message", "Get", "Put", "Integration" })
    +@CapabilityDescription("A Processor that supports sending and receiving data from application defined in "
    +        + "Spring Application Context via predefined in/out MessageChannels.")
    +public class SpringContextProcessor extends AbstractProcessor {
    +    private final Logger logger = LoggerFactory.getLogger(SpringContextProcessor.class);
    +
    +    public static final PropertyDescriptor CTX_CONFIG_NAME = new PropertyDescriptor.Builder()
    +            .name("Application Context config path")
    +            .description("The path to the Spring Application Context configuration file relative to the classpath")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +    public static final PropertyDescriptor CTX_LIB_PATH = new PropertyDescriptor.Builder()
    +            .name("Application Context class path")
    +            .description("Path to the directory with resources (i.e., JARs, configuration files etc.) required to be on "
    +                            + "the classpath of the ApplicationContext.")
    +            .addValidator(new ClientLibValidator())
    +            .required(true)
    +            .build();
    +    public static final PropertyDescriptor SEND_TIMEOUT = new PropertyDescriptor.Builder()
    +            .name("Send Timeout")
    +            .description("Timeout for sending data to Spring Application Context. Defaults to 0.")
    +            .required(false)
    +            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
    +            .build();
    +    public static final PropertyDescriptor RECEIVE_TIMEOUT = new PropertyDescriptor.Builder()
    +            .name("Receive Timeout")
    +            .description("Timeout for receiving date from Spring context. Defaults to 0.")
    +            .required(false)
    +            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
    +            .build();
    +
    +    // ====
    +
    +    public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success")
    +            .description(
    +                    "All FlowFiles that are successfully received from Spring Application Context are routed to this relationship")
    +            .build();
    +    public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure")
    +            .description(
    +                    "All FlowFiles that cannot be sent to Spring Application Context are routed to this relationship")
    +            .build();
    +
    +    private final static Set<Relationship> relationships;
    +
    +    private final static List<PropertyDescriptor> propertyDescriptors;
    +
    +    // =======
    +
    +    private volatile String applicationContextConfigFileName;
    +
    +    private volatile String applicationContextLibPath;
    +
    +    private volatile long sendTimeout;
    +
    +    private volatile long receiveTimeout;
    +
    +    private volatile SpringDataExchanger exchanger;
    +
    +    static {
    +        List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>();
    +        _propertyDescriptors.add(CTX_CONFIG_NAME);
    +        _propertyDescriptors.add(CTX_LIB_PATH);
    +        _propertyDescriptors.add(SEND_TIMEOUT);
    +        _propertyDescriptors.add(RECEIVE_TIMEOUT);
    +        propertyDescriptors = Collections.unmodifiableList(_propertyDescriptors);
    +
    +        Set<Relationship> _relationships = new HashSet<>();
    +        _relationships.add(REL_SUCCESS);
    +        _relationships.add(REL_FAILURE);
    +        relationships = Collections.unmodifiableSet(_relationships);
    +    }
    +
    +    /**
    +     *
    +     */
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return relationships;
    +    }
    +
    +    /**
    +     *
    +     */
    +    @OnScheduled
    +    public void initializeSpringContext(ProcessContext processContext) {
    +        this.applicationContextConfigFileName = processContext.getProperty(CTX_CONFIG_NAME).getValue();
    +        this.applicationContextLibPath = processContext.getProperty(CTX_LIB_PATH).getValue();
    +
    +        String stStr = processContext.getProperty(SEND_TIMEOUT).getValue();
    +        this.sendTimeout = stStr == null ? 0 : FormatUtils.getTimeDuration(stStr, TimeUnit.MILLISECONDS);
    +
    +        String rtStr = processContext.getProperty(RECEIVE_TIMEOUT).getValue();
    +        this.receiveTimeout = rtStr == null ? 0 : FormatUtils.getTimeDuration(rtStr, TimeUnit.MILLISECONDS);
    +
    +        try {
    +            if (logger.isDebugEnabled()) {
    +                logger.debug(
    +                        "Initializing Spring Application Context defined in " + this.applicationContextConfigFileName);
    +            }
    +            this.exchanger = SpringContextFactory.createSpringContextDelegate(this.applicationContextLibPath,
    +                    this.applicationContextConfigFileName);
    +        } catch (Exception e) {
    +            throw new IllegalStateException("Failed while initializing Spring Application Context", e);
    +        }
    +        if (logger.isInfoEnabled()) {
    +            logger.info("Successfully initialized Spring Application Context defined in "
    +                    + this.applicationContextConfigFileName);
    +        }
    +    }
    +
    +    /**
    +     * Will close the 'exchanger' which in turn will close both Spring
    +     * Application Context and the ClassLoader that loaded it allowing new
    +     * instance of Spring Application Context to be created upon the next start
    +     * (which may have an updated classpath and functionality) without
    +     * restarting NiFi.
    +     */
    +    @OnStopped
    +    public void closeSpringContext(ProcessContext processContext) {
    +        if (this.exchanger != null) {
    +            try {
    +                if (logger.isDebugEnabled()) {
    +                    logger.debug(
    +                            "Closing Spring Application Context defined in " + this.applicationContextConfigFileName);
    +                }
    +                this.exchanger.close();
    +                if (logger.isInfoEnabled()) {
    +                    logger.info("Successfully closed Spring Application Context defined in "
    +                            + this.applicationContextConfigFileName);
    +                }
    +            } catch (IOException e) {
    +                getLogger().warn("Failed while closing Spring Application Context", e);
    +            }
    +        }
    +    }
    +
    +    /**
    +     *
    +     */
    +    @Override
    +    public void onTrigger(ProcessContext context, ProcessSession processSession) throws ProcessException {
    +        FlowFile flowFileToProcess = processSession.get();
    +        if (flowFileToProcess != null) {
    +            this.sendToSpring(flowFileToProcess, context, processSession);
    --- End diff --
    
    Good point


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-1571 initial commit of SpringContext suppo...

Posted by trkurc <gi...@git.apache.org>.
Github user trkurc commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/271#discussion_r56105950
  
    --- Diff: nifi-nar-bundles/nifi-spring-bundle/nifi-spring-processors/src/main/java/org/apache/nifi/spring/bootstrap/SpringContextDelegate.java ---
    @@ -0,0 +1,141 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.spring.bootstrap;
    +
    +import java.io.Closeable;
    +import java.io.IOException;
    +import java.net.URLClassLoader;
    +import java.util.Map;
    +
    +import org.apache.nifi.spring.SpringDataExchanger;
    +import org.apache.nifi.spring.SpringNiFiConstants;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +import org.springframework.context.support.ClassPathXmlApplicationContext;
    +import org.springframework.messaging.Message;
    +import org.springframework.messaging.MessageChannel;
    +import org.springframework.messaging.PollableChannel;
    +import org.springframework.messaging.support.MessageBuilder;
    +
    +/**
    + * Scopes instance of itself to a dedicated {@link ClassLoader}, thus allowing
    + * Spring Application Context and its class path to be modified and refreshed by
    + * simply re-starting SpringContextProcessor. Also ensures that there are no
    + * class path collisions between multiple instances of Spring Context Processor
    + * which are loaded by the same NAR Class Loader.
    + */
    +/*
    + * This class is for internal use only and must never be instantiated by the NAR
    + * Class Loader (hence in a isolated package with nothing referencing it). It is
    + * loaded by a dedicated CL via byte array that represents it ensuring that this
    + * class can be loaded multiple times by multiple Class Loaders within a single
    + * instance of NAR.
    + */
    +final class SpringContextDelegate implements Closeable, SpringDataExchanger {
    +
    +    private final Logger logger = LoggerFactory.getLogger(SpringContextDelegate.class);
    +
    +    private final ClassPathXmlApplicationContext applicationContext;
    +
    +    private final MessageChannel toSpringChannel;
    +
    +    private final PollableChannel fromSpringChannel;
    +
    +    /**
    +     *
    +     */
    +    private SpringContextDelegate(String configName) {
    +        ClassLoader orig = Thread.currentThread().getContextClassLoader();
    +        Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
    +        if (logger.isDebugEnabled()) {
    +            logger.debug("Using " + Thread.currentThread().getContextClassLoader()
    +                    + " as context class loader while loading Spring Context '" + configName + "'.");
    +        }
    +        try {
    +            this.applicationContext = new ClassPathXmlApplicationContext(configName);
    +            if (this.applicationContext.containsBean(SpringNiFiConstants.FROM_NIFI)){
    +                this.toSpringChannel = this.applicationContext.getBean(SpringNiFiConstants.FROM_NIFI, MessageChannel.class);
    +                if (logger.isDebugEnabled()) {
    +                    logger.debug("Spring Application Context defined in '" + configName
    +                            + "' is capable of receiving messages from NiFi since 'fromNiFi' channel was discovered.");
    +                }
    +            } else {
    +                this.toSpringChannel = null;
    +            }
    +            if (this.applicationContext.containsBean(SpringNiFiConstants.TO_NIFI)){
    +                this.fromSpringChannel = this.applicationContext.getBean(SpringNiFiConstants.TO_NIFI, PollableChannel.class);
    +                if (logger.isDebugEnabled()) {
    +                    logger.debug("Spring Application Context defined in '" + configName
    +                            + "' is capable of sending messages to " + "NiFi since 'toNiFi' channel was discovered.");
    +                }
    +            } else {
    +                this.fromSpringChannel = null;
    +            }
    +            if (logger.isInfoEnabled() && this.toSpringChannel == null && this.fromSpringChannel == null){
    +                logger.info("Spring Application Context is headless since neither 'fromNiFi' nor 'toNiFi' channels were defined. "
    +                        + "No data will be exchanged.");
    +            }
    +        } finally {
    +            Thread.currentThread().setContextClassLoader(orig);
    +        }
    +    }
    +
    +    /**
    +     *
    +     */
    +    @Override
    +    public <T> boolean send(T payload, Map<String, ?> messageHeaders, long timeout) {
    +        if (this.toSpringChannel != null){
    +            return this.toSpringChannel.send(MessageBuilder.withPayload(payload).copyHeaders(messageHeaders).build(), timeout);
    +        }
    +        return false;
    +    }
    +
    +    /**
    +     *
    +     */
    +    @SuppressWarnings("unchecked")
    +    @Override
    +    public <T> SpringResponse<T> receive(long timeout) {
    +        if (this.fromSpringChannel != null) {
    +            final Message<T> message = (Message<T>) this.fromSpringChannel.receive(timeout);
    +            if (message != null) {
    +                if (!(message.getPayload() instanceof byte[]) && !(message.getPayload() instanceof String)) {
    +                    throw new IllegalStateException("Failed while receiving message from Spring due to the "
    +                            + "payload type being other then byte[] which is currently not supported. Please "
    --- End diff --
    
    not sure on this one, but should this be "byte[] or String"?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-1571 initial commit of SpringContext suppo...

Posted by trkurc <gi...@git.apache.org>.
Github user trkurc commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/271#discussion_r56447125
  
    --- Diff: nifi-nar-bundles/nifi-spring-bundle/nifi-spring-processors/src/main/java/org/apache/nifi/spring/SpringContextProcessor.java ---
    @@ -0,0 +1,392 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.spring;
    +
    +import java.io.File;
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.io.OutputStream;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Map.Entry;
    +import java.util.Set;
    +import java.util.concurrent.TimeUnit;
    +
    +import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.annotation.lifecycle.OnStopped;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.ValidationContext;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Processor;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.io.InputStreamCallback;
    +import org.apache.nifi.processor.io.OutputStreamCallback;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.spring.SpringDataExchanger.SpringResponse;
    +import org.apache.nifi.stream.io.StreamUtils;
    +import org.apache.nifi.util.FormatUtils;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +import org.springframework.messaging.Message;
    +import org.springframework.messaging.MessageChannel;
    +import org.springframework.messaging.PollableChannel;
    +
    +/**
    + * Implementation of {@link Processor} capable of sending and receiving data
    + * from application defined in Spring Application context. It does so via
    + * predefined in/out {@link MessageChannel}s (see spring-messaging module of
    + * Spring). Once such channels are defined user is free to implement the rest of
    + * the application any way they wish (e.g., custom code and/or using frameworks
    + * such as Spring Integration or Camel).
    + * <p>
    + * The requirement and expectations for channel types are:
    + * <ul>
    + * <li>Input channel must be of type {@link MessageChannel} and named "fromNiFi"
    + * (see {@link SpringNiFiConstants#FROM_NIFI})</li>
    + * <li>Output channel must be of type {@link PollableChannel} and named "toNiFi"
    + * (see {@link SpringNiFiConstants#TO_NIFI})</li>
    + * </ul>
    + * </p>
    + * Below is the example of sample configuration:
    + *
    + * <pre>
    + * &lt;?xml version="1.0" encoding="UTF-8"?&gt;
    + * &lt;beans xmlns="http://www.springframework.org/schema/beans"
    + *   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    + *   xmlns:int="http://www.springframework.org/schema/integration"
    + *  xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
    + *      http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration-4.2.xsd"&gt;
    + *
    + *  &lt;int:channel id="fromNiFi"/&gt;
    + *
    + *  . . . . .
    + *
    + *  &lt;int:channel id="toNiFi"&gt;
    + *      &lt;int:queue/&gt;
    + *  &lt;/int:channel&gt;
    + *
    + * &lt;/beans&gt;
    + * </pre>
    + * <p>
    + * Defining {@link MessageChannel} is optional. That's why this processor
    + * supports 3 modes of interaction with Spring Application Context:
    + * <ul>
    + * <li>Headless – no channels are defined therefore nothing is sent to or
    + * received from such Application Contexts (i.e., some monitoring app).</li>
    + * <li>One way (NiFi -&gt; Spring or Spring -&gt; NiFi) - depends on existence
    + * of one of "fromNiFi" or "toNiFi" channel in the Spring Application Context.
    + * </li>
    + * <li>Bi-directional (NiFi -&gt; Spring -&gt; Nifi or Spring -&gt; NiFi -&gt;
    + * Spring) - depends on existence of both "fromNiFi" and "toNiFi" channels in
    + * the Spring Application Context</li>
    + * </ul>
    + *
    + * </p>
    + * <p>
    + * To create an instance of the ApplicationConetxt this processor requires user
    + * to provide configuration file path and the path to the resources that needs
    + * to be added to the classpath of ApplicationContext. This essentially allows
    + * user to package their Spring Application any way they want as long as
    + * everything it requires is available on the classpath.
    + * </p>
    + * <p>
    + * Data exchange between Spring and NiFi relies on simple mechanism which is
    + * exposed via {@link SpringDataExchanger}; {@link FlowFile}s's content is
    + * converted to primitive representation that can be easily wrapped in Spring
    + * {@link Message}. The requirement imposed by this Processor is to send/receive
    + * {@link Message} with <i>payload</i> of type <i>byte[]</i> and headers of type
    + * <i>Map&lt;String, Object&gt;</i>. This is primarily for simplicity and type
    + * safety. Converters and Transformers could be used by either side to change
    + * representation of the content that is being exchanged between NiFi and
    + * Spring.
    + */
    +@TriggerWhenEmpty
    +@Tags({ "Spring", "Message", "Get", "Put", "Integration" })
    +@CapabilityDescription("A Processor that supports sending and receiving data from application defined in "
    +        + "Spring Application Context via predefined in/out MessageChannels.")
    +public class SpringContextProcessor extends AbstractProcessor {
    +    private final Logger logger = LoggerFactory.getLogger(SpringContextProcessor.class);
    +
    +    public static final PropertyDescriptor CTX_CONFIG_NAME = new PropertyDescriptor.Builder()
    +            .name("Application Context config path")
    +            .description("The path to the Spring Application Context configuration file relative to the classpath")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    --- End diff --
    
    It was logged and I figured out my issue almost immediately, if it was easy, it'd be worth adding.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-1571 initial commit of SpringContext suppo...

Posted by trkurc <gi...@git.apache.org>.
Github user trkurc commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/271#discussion_r56102458
  
    --- Diff: nifi-nar-bundles/pom.xml ---
    @@ -54,7 +54,8 @@
             <module>nifi-scripting-bundle</module>
             <module>nifi-elasticsearch-bundle</module>
             <module>nifi-amqp-bundle</module>
    -	<module>nifi-splunk-bundle</module>
    +	    <module>nifi-splunk-bundle</module>
    --- End diff --
    
    something strange is happening with whitespace here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-1571 initial commit of SpringContext suppo...

Posted by olegz <gi...@git.apache.org>.
Github user olegz commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/271#discussion_r56445529
  
    --- Diff: nifi-nar-bundles/nifi-spring-bundle/nifi-spring-processors/src/main/java/org/apache/nifi/spring/SpringContextProcessor.java ---
    @@ -0,0 +1,392 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.spring;
    +
    +import java.io.File;
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.io.OutputStream;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Map.Entry;
    +import java.util.Set;
    +import java.util.concurrent.TimeUnit;
    +
    +import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.annotation.lifecycle.OnStopped;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.ValidationContext;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Processor;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.io.InputStreamCallback;
    +import org.apache.nifi.processor.io.OutputStreamCallback;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.spring.SpringDataExchanger.SpringResponse;
    +import org.apache.nifi.stream.io.StreamUtils;
    +import org.apache.nifi.util.FormatUtils;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +import org.springframework.messaging.Message;
    +import org.springframework.messaging.MessageChannel;
    +import org.springframework.messaging.PollableChannel;
    +
    +/**
    + * Implementation of {@link Processor} capable of sending and receiving data
    + * from application defined in Spring Application context. It does so via
    + * predefined in/out {@link MessageChannel}s (see spring-messaging module of
    + * Spring). Once such channels are defined user is free to implement the rest of
    + * the application any way they wish (e.g., custom code and/or using frameworks
    + * such as Spring Integration or Camel).
    + * <p>
    + * The requirement and expectations for channel types are:
    + * <ul>
    + * <li>Input channel must be of type {@link MessageChannel} and named "fromNiFi"
    + * (see {@link SpringNiFiConstants#FROM_NIFI})</li>
    + * <li>Output channel must be of type {@link PollableChannel} and named "toNiFi"
    + * (see {@link SpringNiFiConstants#TO_NIFI})</li>
    + * </ul>
    + * </p>
    + * Below is the example of sample configuration:
    + *
    + * <pre>
    + * &lt;?xml version="1.0" encoding="UTF-8"?&gt;
    + * &lt;beans xmlns="http://www.springframework.org/schema/beans"
    + *   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    + *   xmlns:int="http://www.springframework.org/schema/integration"
    + *  xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
    + *      http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration-4.2.xsd"&gt;
    + *
    + *  &lt;int:channel id="fromNiFi"/&gt;
    + *
    + *  . . . . .
    + *
    + *  &lt;int:channel id="toNiFi"&gt;
    + *      &lt;int:queue/&gt;
    + *  &lt;/int:channel&gt;
    + *
    + * &lt;/beans&gt;
    + * </pre>
    + * <p>
    + * Defining {@link MessageChannel} is optional. That's why this processor
    + * supports 3 modes of interaction with Spring Application Context:
    + * <ul>
    + * <li>Headless – no channels are defined therefore nothing is sent to or
    + * received from such Application Contexts (i.e., some monitoring app).</li>
    + * <li>One way (NiFi -&gt; Spring or Spring -&gt; NiFi) - depends on existence
    + * of one of "fromNiFi" or "toNiFi" channel in the Spring Application Context.
    + * </li>
    + * <li>Bi-directional (NiFi -&gt; Spring -&gt; Nifi or Spring -&gt; NiFi -&gt;
    + * Spring) - depends on existence of both "fromNiFi" and "toNiFi" channels in
    + * the Spring Application Context</li>
    + * </ul>
    + *
    + * </p>
    + * <p>
    + * To create an instance of the ApplicationConetxt this processor requires user
    + * to provide configuration file path and the path to the resources that needs
    + * to be added to the classpath of ApplicationContext. This essentially allows
    + * user to package their Spring Application any way they want as long as
    + * everything it requires is available on the classpath.
    + * </p>
    + * <p>
    + * Data exchange between Spring and NiFi relies on simple mechanism which is
    + * exposed via {@link SpringDataExchanger}; {@link FlowFile}s's content is
    + * converted to primitive representation that can be easily wrapped in Spring
    + * {@link Message}. The requirement imposed by this Processor is to send/receive
    + * {@link Message} with <i>payload</i> of type <i>byte[]</i> and headers of type
    + * <i>Map&lt;String, Object&gt;</i>. This is primarily for simplicity and type
    + * safety. Converters and Transformers could be used by either side to change
    + * representation of the content that is being exchanged between NiFi and
    + * Spring.
    + */
    +@TriggerWhenEmpty
    +@Tags({ "Spring", "Message", "Get", "Put", "Integration" })
    +@CapabilityDescription("A Processor that supports sending and receiving data from application defined in "
    +        + "Spring Application Context via predefined in/out MessageChannels.")
    +public class SpringContextProcessor extends AbstractProcessor {
    +    private final Logger logger = LoggerFactory.getLogger(SpringContextProcessor.class);
    +
    +    public static final PropertyDescriptor CTX_CONFIG_NAME = new PropertyDescriptor.Builder()
    +            .name("Application Context config path")
    +            .description("The path to the Spring Application Context configuration file relative to the classpath")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    --- End diff --
    
    Hmm, that is actually doable, although tricky since the file may be inside any of the JARs as well as in plain site. Do you think it's important to have now or enhancement for 0.7?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-1571 initial commit of SpringContext suppo...

Posted by trkurc <gi...@git.apache.org>.
Github user trkurc commented on the pull request:

    https://github.com/apache/nifi/pull/271#issuecomment-197323770
  
    @olegz: A sample app would be great. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---