You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@apex.apache.org by bbende <gi...@git.apache.org> on 2015/12/15 17:22:38 UTC

[GitHub] incubator-apex-malhar pull request: MLHR-1936 Adding NiFi operator...

GitHub user bbende opened a pull request:

    https://github.com/apache/incubator-apex-malhar/pull/133

    MLHR-1936 Adding NiFi operators to contrib

    This pull request adds NiFi operators to the contrib module. The integration is based off the NiFi site-to-site client which is the mechanism for pulling/pushing data to/from a NiFi instance or cluster. It is used internally when a NiFi instance communicates with another instance, and also between other third party frameworks. In a cluster, the site-to-site client knows about the nodes in the cluster and will pull/push to/from all nodes.
    
    To test the operators against a running NiFi instance you can do the following:
    
    * Download the latest NiFi release and extract it: https://nifi.apache.org/download.html
    * Edit conf/nifi.properties and set nifi.remote.input.socket.port to any available port, and nifi.remote.input.secure=false
    * Start NiFi with bin/nifi.sh start
    * Go the NiFi UI in your browser http://localhost:8080/nifi/
    * Import the NiFi_Apex template that will be attached to the JIRA, template button in top-right toolbar
    * Drag the template on to the canvas, button near top-left of toolbar
    * Start all of the processors and ports 
    * Run either TestNiFiInputApplication or TestNiFiOutputApplication in contrib/src/test/java/com.datatorrent.contrib.nifi.demo
    
    
    ![nifi-apex-flow](https://cloud.githubusercontent.com/assets/605416/11816192/651b6f36-a31d-11e5-806f-49d3a4a6a159.jpg)
    
    The left side of the flow generates fake data to a NiFi output port, the NiFi Site-To-Site client will pull data from this output port in the case of the Malhar NiFi input operator. The right side has an a NiFi input port waiting for data to be pushed from the Malhar NiFi output operator.
    
    Some questions/comments....
    
    * Is using activate/deactivate preferred over setup/teardown? 
    * I wasn't sure if calling Thread.sleep in emitTuples was appropriate here. We did this in other integrations such as storm and spark, but in those cases we were with in a while loop so we didn't want to make a ton of requests when no data was available. I'm not familiar with how Malhar will call emitTuples, so let me know if this doesn't sense here.
    * Wasn't sure about the preferred approach for exception handling, seems wrapping a checked exception with RuntimeException is appropriate?
    * I noticed some other operators used a WindowDataManager which I think was to be able to replay data,  didn't really know enough about that to know if that was needed here.


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/bbende/incubator-apex-malhar MLHR-1936.nifi-connector

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/incubator-apex-malhar/pull/133.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #133
    
----
commit c58087a47ddeb19fd917b767307ae3562a04f90b
Author: Bryan Bende <bb...@apache.org>
Date:   2015-12-14T22:34:59Z

    MLHR-1936 Adding NiFi operators to contrib

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: MLHR-1936 Adding NiFi operator...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/incubator-apex-malhar/pull/133


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Re: [GitHub] incubator-apex-malhar pull request: MLHR-1936 Adding NiFi operator...

Posted by Bryan Bende <bb...@gmail.com>.
Seems like top-level comments on pull requests don't generate emails, so
sending this here just as an FYI...

I updated this pull request based on everyone's feedback. The biggest
change here is adding the WindowDataManager to ensure data is saved before
we complete a NiFi transaction, at which point it would be gone if we lost
it. I based this mostly off looking at the RabbitMQ operators, but let me
know if anything doesn't seem right.

Thanks,

Bryan

On Tue, Jan 26, 2016 at 4:46 PM, bbende <gi...@git.apache.org> wrote:

> Github user bbende commented on a diff in the pull request:
>
>
> https://github.com/apache/incubator-apex-malhar/pull/133#discussion_r50905617
>
>     --- Diff:
> contrib/src/main/java/com/datatorrent/contrib/nifi/AbstractNiFiInputOperator.java
> ---
>     @@ -0,0 +1,162 @@
>     +/**
>     + * Licensed to the Apache Software Foundation (ASF) under one
>     + * or more contributor license agreements.  See the NOTICE file
>     + * distributed with this work for additional information
>     + * regarding copyright ownership.  The ASF licenses this file
>     + * to you under the Apache License, Version 2.0 (the
>     + * "License"); you may not use this file except in compliance
>     + * with the License.  You may obtain a copy of the License at
>     + *
>     + *   http://www.apache.org/licenses/LICENSE-2.0
>     + *
>     + * Unless required by applicable law or agreed to in writing,
>     + * software distributed under the License is distributed on an
>     + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
>     + * KIND, either express or implied.  See the License for the
>     + * specific language governing permissions and limitations
>     + * under the License.
>     + */
>     +package com.datatorrent.contrib.nifi;
>     +
>     +import java.io.IOException;
>     +import java.util.ArrayList;
>     +import java.util.List;
>     +
>     +import org.slf4j.Logger;
>     +import org.slf4j.LoggerFactory;
>     +
>     +import org.apache.nifi.remote.Transaction;
>     +import org.apache.nifi.remote.TransferDirection;
>     +import org.apache.nifi.remote.client.SiteToSiteClient;
>     +import org.apache.nifi.remote.protocol.DataPacket;
>     +
>     +import com.datatorrent.api.Context;
>     +import com.datatorrent.api.InputOperator;
>     +
>     +/**
>     + * This is the base implementation of a NiFi input operator.&nbsp;
>     + * Subclasses should implement the methods which convert NiFi
> DataPackets to tuples and emit them.
>     + * <p>
>     + * Ports:<br>
>     + * <b>Input</b>: No input port<br>
>     + * <b>Output</b>: Can have any number of output ports<br>
>     + * <br>
>     + * Properties:<br>
>     + * None<br>
>     + * <br>
>     + * Compile time checks:<br>
>     + * Classes derived from this have to implement the abstract methods
> emitTuples(List<T> tuples)&nbsp;
>     + * and createTuple(DataPacket dp)<br>
>     + * <br>
>     + * Run time checks:<br>
>     + * None<br>
>     + * <br>
>     + * Benchmarks:<br>
>     + * TBD<br>
>     + * </p>
>     + *
>     + * @displayName Abstract NiFi Input
>     + * @category Messaging
>     + * @tags input operator
>     + * @since 3.3.0
>     + */
>     +
>     +public abstract class AbstractNiFiInputOperator<T> implements
> InputOperator
>     +{
>     +
>     +  private static final Logger LOGGER =
> LoggerFactory.getLogger(AbstractNiFiInputOperator.class);
>     +
>     +  private SiteToSiteClient client;
>     +  private final SiteToSiteClient.Builder siteToSiteBuilder;
>     +
>     +  /**
>     +   * @param siteToSiteBuilder the builder for a NiFi SiteToSiteClient
>     +   */
>     +  public AbstractNiFiInputOperator(final SiteToSiteClient.Builder
> siteToSiteBuilder)
>     +  {
>     +    this.siteToSiteBuilder = siteToSiteBuilder;
>     +  }
>     +
>     +  @Override
>     +  public void setup(Context.OperatorContext context)
>     +  {
>     +    this.client = siteToSiteBuilder.build();
>     +  }
>     +
>     +  @Override
>     +  public void teardown()
>     +  {
>     +    try {
>     +      client.close();
>     +    } catch (IOException e) {
>     +      LOGGER.error(e.getMessage(), e);
>     +    }
>     +  }
>     +
>     +  @Override
>     +  public void emitTuples()
>     +  {
>     +    try {
>     +      final Transaction transaction =
> client.createTransaction(TransferDirection.RECEIVE);
>     +      if (transaction == null) {
>     +        LOGGER.warn("A transaction could not be created,
> returning...");
>     +        return;
>     +      }
>     +
>     +      DataPacket dataPacket = transaction.receive();
>     +      if (dataPacket == null) {
>     +        transaction.confirm();
>     +        transaction.complete();
>     +        LOGGER.debug("No data available to pull, returning and will
> try again...");
>     +        return;
>     +      }
>     +
>     +      // read all of the available data packets and convert to the
> given type
>     +      final List<T> tuples = new ArrayList<>();
>     +      do {
>     +        tuples.add(createTuple(dataPacket));
>     +        dataPacket = transaction.receive();
>     +      } while (dataPacket != null);
>     --- End diff --
>
>     shouldn't be able to get an infinite loop... transaction.receive()
> will return null when there is no matter data to pull, or when it is has
> pulled the maximum number of data packets for a transaction (configured on
> the site-to-site client)
>
>
> ---
> If your project is set up for it, you can reply to this email and have your
> reply appear on GitHub as well. If your project does not have this feature
> enabled and wishes so, or if the feature is enabled but not working, please
> contact infrastructure at infrastructure@apache.org or file a JIRA ticket
> with INFRA.
> ---
>

[GitHub] incubator-apex-malhar pull request: MLHR-1936 Adding NiFi operator...

Posted by bbende <gi...@git.apache.org>.
Github user bbende commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/133#discussion_r50905617
  
    --- Diff: contrib/src/main/java/com/datatorrent/contrib/nifi/AbstractNiFiInputOperator.java ---
    @@ -0,0 +1,162 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.contrib.nifi;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.List;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import org.apache.nifi.remote.Transaction;
    +import org.apache.nifi.remote.TransferDirection;
    +import org.apache.nifi.remote.client.SiteToSiteClient;
    +import org.apache.nifi.remote.protocol.DataPacket;
    +
    +import com.datatorrent.api.Context;
    +import com.datatorrent.api.InputOperator;
    +
    +/**
    + * This is the base implementation of a NiFi input operator.&nbsp;
    + * Subclasses should implement the methods which convert NiFi DataPackets to tuples and emit them.
    + * <p>
    + * Ports:<br>
    + * <b>Input</b>: No input port<br>
    + * <b>Output</b>: Can have any number of output ports<br>
    + * <br>
    + * Properties:<br>
    + * None<br>
    + * <br>
    + * Compile time checks:<br>
    + * Classes derived from this have to implement the abstract methods emitTuples(List<T> tuples)&nbsp;
    + * and createTuple(DataPacket dp)<br>
    + * <br>
    + * Run time checks:<br>
    + * None<br>
    + * <br>
    + * Benchmarks:<br>
    + * TBD<br>
    + * </p>
    + *
    + * @displayName Abstract NiFi Input
    + * @category Messaging
    + * @tags input operator
    + * @since 3.3.0
    + */
    +
    +public abstract class AbstractNiFiInputOperator<T> implements InputOperator
    +{
    +
    +  private static final Logger LOGGER = LoggerFactory.getLogger(AbstractNiFiInputOperator.class);
    +
    +  private SiteToSiteClient client;
    +  private final SiteToSiteClient.Builder siteToSiteBuilder;
    +
    +  /**
    +   * @param siteToSiteBuilder the builder for a NiFi SiteToSiteClient
    +   */
    +  public AbstractNiFiInputOperator(final SiteToSiteClient.Builder siteToSiteBuilder)
    +  {
    +    this.siteToSiteBuilder = siteToSiteBuilder;
    +  }
    +
    +  @Override
    +  public void setup(Context.OperatorContext context)
    +  {
    +    this.client = siteToSiteBuilder.build();
    +  }
    +
    +  @Override
    +  public void teardown()
    +  {
    +    try {
    +      client.close();
    +    } catch (IOException e) {
    +      LOGGER.error(e.getMessage(), e);
    +    }
    +  }
    +
    +  @Override
    +  public void emitTuples()
    +  {
    +    try {
    +      final Transaction transaction = client.createTransaction(TransferDirection.RECEIVE);
    +      if (transaction == null) {
    +        LOGGER.warn("A transaction could not be created, returning...");
    +        return;
    +      }
    +
    +      DataPacket dataPacket = transaction.receive();
    +      if (dataPacket == null) {
    +        transaction.confirm();
    +        transaction.complete();
    +        LOGGER.debug("No data available to pull, returning and will try again...");
    +        return;
    +      }
    +
    +      // read all of the available data packets and convert to the given type
    +      final List<T> tuples = new ArrayList<>();
    +      do {
    +        tuples.add(createTuple(dataPacket));
    +        dataPacket = transaction.receive();
    +      } while (dataPacket != null);
    --- End diff --
    
    shouldn't be able to get an infinite loop... transaction.receive() will return null when there is no matter data to pull, or when it is has pulled the maximum number of data packets for a transaction (configured on the site-to-site client)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: MLHR-1936 Adding NiFi operator...

Posted by gauravgopi123 <gi...@git.apache.org>.
Github user gauravgopi123 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/133#discussion_r48077653
  
    --- Diff: contrib/src/main/java/com/datatorrent/contrib/nifi/AbstractNiFiInputOperator.java ---
    @@ -0,0 +1,162 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.contrib.nifi;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.List;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import org.apache.nifi.remote.Transaction;
    +import org.apache.nifi.remote.TransferDirection;
    +import org.apache.nifi.remote.client.SiteToSiteClient;
    +import org.apache.nifi.remote.protocol.DataPacket;
    +
    +import com.datatorrent.api.Context;
    +import com.datatorrent.api.InputOperator;
    +
    +/**
    + * This is the base implementation of a NiFi input operator.&nbsp;
    + * Subclasses should implement the methods which convert NiFi DataPackets to tuples and emit them.
    + * <p>
    + * Ports:<br>
    + * <b>Input</b>: No input port<br>
    + * <b>Output</b>: Can have any number of output ports<br>
    + * <br>
    + * Properties:<br>
    + * None<br>
    + * <br>
    + * Compile time checks:<br>
    + * Classes derived from this have to implement the abstract methods emitTuples(List<T> tuples)&nbsp;
    + * and createTuple(DataPacket dp)<br>
    + * <br>
    + * Run time checks:<br>
    + * None<br>
    + * <br>
    + * Benchmarks:<br>
    + * TBD<br>
    + * </p>
    + *
    + * @displayName Abstract NiFi Input
    + * @category Messaging
    + * @tags input operator
    + * @since 3.3.0
    + */
    +
    +public abstract class AbstractNiFiInputOperator<T> implements InputOperator
    +{
    +
    +  private static final Logger LOGGER = LoggerFactory.getLogger(AbstractNiFiInputOperator.class);
    +
    +  private SiteToSiteClient client;
    +  private final SiteToSiteClient.Builder siteToSiteBuilder;
    +
    +  /**
    +   * @param siteToSiteBuilder the builder for a NiFi SiteToSiteClient
    +   */
    +  public AbstractNiFiInputOperator(final SiteToSiteClient.Builder siteToSiteBuilder)
    +  {
    +    this.siteToSiteBuilder = siteToSiteBuilder;
    +  }
    +
    +  @Override
    +  public void setup(Context.OperatorContext context)
    +  {
    +    this.client = siteToSiteBuilder.build();
    +  }
    +
    +  @Override
    +  public void teardown()
    +  {
    +    try {
    +      client.close();
    +    } catch (IOException e) {
    +      LOGGER.error(e.getMessage(), e);
    --- End diff --
    
    don't swallow exception


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: MLHR-1936 Adding NiFi operator...

Posted by gauravgopi123 <gi...@git.apache.org>.
Github user gauravgopi123 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/133#discussion_r48077817
  
    --- Diff: contrib/src/main/java/com/datatorrent/contrib/nifi/AbstractNiFiInputOperator.java ---
    @@ -0,0 +1,162 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.contrib.nifi;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.List;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import org.apache.nifi.remote.Transaction;
    +import org.apache.nifi.remote.TransferDirection;
    +import org.apache.nifi.remote.client.SiteToSiteClient;
    +import org.apache.nifi.remote.protocol.DataPacket;
    +
    +import com.datatorrent.api.Context;
    +import com.datatorrent.api.InputOperator;
    +
    +/**
    + * This is the base implementation of a NiFi input operator.&nbsp;
    + * Subclasses should implement the methods which convert NiFi DataPackets to tuples and emit them.
    + * <p>
    + * Ports:<br>
    + * <b>Input</b>: No input port<br>
    + * <b>Output</b>: Can have any number of output ports<br>
    + * <br>
    + * Properties:<br>
    + * None<br>
    + * <br>
    + * Compile time checks:<br>
    + * Classes derived from this have to implement the abstract methods emitTuples(List<T> tuples)&nbsp;
    + * and createTuple(DataPacket dp)<br>
    + * <br>
    + * Run time checks:<br>
    + * None<br>
    + * <br>
    + * Benchmarks:<br>
    + * TBD<br>
    + * </p>
    + *
    + * @displayName Abstract NiFi Input
    + * @category Messaging
    + * @tags input operator
    + * @since 3.3.0
    + */
    +
    +public abstract class AbstractNiFiInputOperator<T> implements InputOperator
    +{
    +
    +  private static final Logger LOGGER = LoggerFactory.getLogger(AbstractNiFiInputOperator.class);
    +
    +  private SiteToSiteClient client;
    +  private final SiteToSiteClient.Builder siteToSiteBuilder;
    +
    +  /**
    +   * @param siteToSiteBuilder the builder for a NiFi SiteToSiteClient
    +   */
    +  public AbstractNiFiInputOperator(final SiteToSiteClient.Builder siteToSiteBuilder)
    +  {
    +    this.siteToSiteBuilder = siteToSiteBuilder;
    +  }
    +
    +  @Override
    +  public void setup(Context.OperatorContext context)
    +  {
    +    this.client = siteToSiteBuilder.build();
    +  }
    +
    +  @Override
    +  public void teardown()
    +  {
    +    try {
    +      client.close();
    +    } catch (IOException e) {
    +      LOGGER.error(e.getMessage(), e);
    +    }
    +  }
    +
    +  @Override
    +  public void emitTuples()
    +  {
    +    try {
    +      final Transaction transaction = client.createTransaction(TransferDirection.RECEIVE);
    +      if (transaction == null) {
    +        LOGGER.warn("A transaction could not be created, returning...");
    +        return;
    +      }
    +
    +      DataPacket dataPacket = transaction.receive();
    +      if (dataPacket == null) {
    +        transaction.confirm();
    +        transaction.complete();
    +        LOGGER.debug("No data available to pull, returning and will try again...");
    +        return;
    +      }
    +
    +      // read all of the available data packets and convert to the given type
    +      final List<T> tuples = new ArrayList<>();
    +      do {
    +        tuples.add(createTuple(dataPacket));
    +        dataPacket = transaction.receive();
    +      } while (dataPacket != null);
    --- End diff --
    
    can this lead to infinite loop? May be you want to add number of DataPacket condition as well?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: MLHR-1936 Adding NiFi operator...

Posted by gauravgopi123 <gi...@git.apache.org>.
Github user gauravgopi123 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/133#discussion_r48071410
  
    --- Diff: contrib/src/main/java/com/datatorrent/contrib/nifi/AbstractNiFiInputOperator.java ---
    @@ -0,0 +1,162 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.contrib.nifi;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.List;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import org.apache.nifi.remote.Transaction;
    +import org.apache.nifi.remote.TransferDirection;
    +import org.apache.nifi.remote.client.SiteToSiteClient;
    +import org.apache.nifi.remote.protocol.DataPacket;
    +
    +import com.datatorrent.api.Context;
    +import com.datatorrent.api.InputOperator;
    +
    +/**
    + * This is the base implementation of a NiFi input operator.&nbsp;
    + * Subclasses should implement the methods which convert NiFi DataPackets to tuples and emit them.
    + * <p>
    + * Ports:<br>
    + * <b>Input</b>: No input port<br>
    + * <b>Output</b>: Can have any number of output ports<br>
    + * <br>
    + * Properties:<br>
    + * None<br>
    + * <br>
    + * Compile time checks:<br>
    + * Classes derived from this have to implement the abstract methods emitTuples(List<T> tuples)&nbsp;
    + * and createTuple(DataPacket dp)<br>
    + * <br>
    + * Run time checks:<br>
    + * None<br>
    + * <br>
    + * Benchmarks:<br>
    + * TBD<br>
    + * </p>
    + *
    + * @displayName Abstract NiFi Input
    + * @category Messaging
    + * @tags input operator
    + * @since 3.3.0
    --- End diff --
    
    since tag not required


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: MLHR-1936 Adding NiFi operator...

Posted by gauravgopi123 <gi...@git.apache.org>.
Github user gauravgopi123 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/133#discussion_r48081357
  
    --- Diff: contrib/src/main/java/com/datatorrent/contrib/nifi/AbstractNiFiOutputOperator.java ---
    @@ -0,0 +1,118 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.contrib.nifi;
    +
    +import java.io.IOException;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import org.apache.nifi.remote.Transaction;
    +import org.apache.nifi.remote.TransferDirection;
    +import org.apache.nifi.remote.client.SiteToSiteClient;
    +
    +import com.datatorrent.api.Context;
    +import com.datatorrent.common.util.BaseOperator;
    +import com.datatorrent.netlet.util.DTThrowable;
    +
    +/**
    + * This is the base implementation of a NiFi output operator.&nbsp;
    + * A concrete operator should be created from this skeleton implementation.
    + * <p>
    + * <br>
    + * Ports:<br>
    + * <b>Input</b>: Can have any number of input ports<br>
    + * <b>Output</b>: no output port<br>
    + * <br>
    + * Properties:<br>
    + * None<br>
    + * <br>
    + * Compile time checks:<br>
    + * None<br>
    + * <br>
    + * Run time checks:<br>
    + * None<br>
    + * <br>
    + * Benchmarks:<br>
    + * TBD<br>
    + * <br>
    + * </p>
    + *
    + * @displayName Abstract NiFi Output
    + * @category Messaging
    + * @tags output operator
    + *
    + * @since 3.3.0
    + *
    + */
    +public abstract class AbstractNiFiOutputOperator<T> extends BaseOperator
    +{
    +
    +  private static final Logger LOGGER = LoggerFactory.getLogger(AbstractNiFiOutputOperator.class);
    +
    +  protected SiteToSiteClient client;
    +  protected final SiteToSiteClient.Builder siteToSiteBuilder;
    +  protected final NiFiDataPacketBuilder<T> dataPacketBuilder;
    +
    +  /**
    +   * @param siteToSiteBuilder the builder for a NiFi SiteToSiteClient
    +   * @param dataPacketBuilder a builder to produce NiFiDataPackets from incoming data
    +   */
    +  public AbstractNiFiOutputOperator(final SiteToSiteClient.Builder siteToSiteBuilder,
    +      final NiFiDataPacketBuilder<T> dataPacketBuilder)
    +  {
    +    this.siteToSiteBuilder = siteToSiteBuilder;
    +    this.dataPacketBuilder = dataPacketBuilder;
    +  }
    +
    +  @Override
    +  public void setup(Context.OperatorContext context)
    +  {
    +    this.client = siteToSiteBuilder.build();
    +  }
    +
    +  @Override
    +  public void teardown()
    +  {
    +    try {
    +      client.close();
    +    } catch (IOException e) {
    +      LOGGER.error(e.getMessage(), e);
    --- End diff --
    
    throw exception


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: MLHR-1936 Adding NiFi operator...

Posted by gauravgopi123 <gi...@git.apache.org>.
Github user gauravgopi123 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/133#discussion_r48071313
  
    --- Diff: contrib/pom.xml ---
    @@ -220,6 +220,11 @@
     
       <dependencies>
         <dependency>
    +      <groupId>org.apache.nifi</groupId>
    +      <artifactId>nifi-site-to-site-client</artifactId>
    +      <version>0.4.0</version>
    --- End diff --
    
    This dependency should be marked optional


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: MLHR-1936 Adding NiFi operator...

Posted by tweise <gi...@git.apache.org>.
Github user tweise commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/133#discussion_r48077190
  
    --- Diff: contrib/src/main/java/com/datatorrent/contrib/nifi/AbstractNiFiInputOperator.java ---
    @@ -0,0 +1,162 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.contrib.nifi;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.List;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import org.apache.nifi.remote.Transaction;
    +import org.apache.nifi.remote.TransferDirection;
    +import org.apache.nifi.remote.client.SiteToSiteClient;
    +import org.apache.nifi.remote.protocol.DataPacket;
    +
    +import com.datatorrent.api.Context;
    +import com.datatorrent.api.InputOperator;
    +
    +/**
    + * This is the base implementation of a NiFi input operator.&nbsp;
    + * Subclasses should implement the methods which convert NiFi DataPackets to tuples and emit them.
    + * <p>
    + * Ports:<br>
    + * <b>Input</b>: No input port<br>
    + * <b>Output</b>: Can have any number of output ports<br>
    + * <br>
    + * Properties:<br>
    + * None<br>
    + * <br>
    + * Compile time checks:<br>
    + * Classes derived from this have to implement the abstract methods emitTuples(List<T> tuples)&nbsp;
    + * and createTuple(DataPacket dp)<br>
    + * <br>
    + * Run time checks:<br>
    + * None<br>
    + * <br>
    + * Benchmarks:<br>
    + * TBD<br>
    + * </p>
    + *
    + * @displayName Abstract NiFi Input
    + * @category Messaging
    + * @tags input operator
    + * @since 3.3.0
    + */
    +
    +public abstract class AbstractNiFiInputOperator<T> implements InputOperator
    +{
    +
    +  private static final Logger LOGGER = LoggerFactory.getLogger(AbstractNiFiInputOperator.class);
    +
    +  private SiteToSiteClient client;
    +  private final SiteToSiteClient.Builder siteToSiteBuilder;
    +
    +  /**
    +   * @param siteToSiteBuilder the builder for a NiFi SiteToSiteClient
    +   */
    +  public AbstractNiFiInputOperator(final SiteToSiteClient.Builder siteToSiteBuilder)
    +  {
    +    this.siteToSiteBuilder = siteToSiteBuilder;
    +  }
    +
    +  @Override
    +  public void setup(Context.OperatorContext context)
    +  {
    +    this.client = siteToSiteBuilder.build();
    +  }
    +
    +  @Override
    +  public void teardown()
    +  {
    +    try {
    +      client.close();
    +    } catch (IOException e) {
    +      LOGGER.error(e.getMessage(), e);
    +    }
    +  }
    +
    +  @Override
    +  public void emitTuples()
    +  {
    +    try {
    +      final Transaction transaction = client.createTransaction(TransferDirection.RECEIVE);
    +      if (transaction == null) {
    +        LOGGER.warn("A transaction could not be created, returning...");
    +        return;
    +      }
    +
    +      DataPacket dataPacket = transaction.receive();
    +      if (dataPacket == null) {
    +        transaction.confirm();
    +        transaction.complete();
    +        LOGGER.debug("No data available to pull, returning and will try again...");
    +        return;
    +      }
    +
    +      // read all of the available data packets and convert to the given type
    +      final List<T> tuples = new ArrayList<>();
    +      do {
    +        tuples.add(createTuple(dataPacket));
    +        dataPacket = transaction.receive();
    +      } while (dataPacket != null);
    +
    +      // confirm all of the expected data was received
    +      transaction.confirm();
    +
    +      // delegate to sub-classes to emit the tuples
    +      emitTuples(tuples);
    +
    +      // everything was successful so complete the transaction
    +      transaction.complete();
    +
    --- End diff --
    
    Doing the confirm here will lead to data loss when the operator fails. We would need to retrieve the data into a fault tolerant storage before confirming the transaction. This looks similar to JMS, where we cannot acknowledge the message prior to having it stored. The place to store data would be endWindow. Looks like the pattern from JSM applies here. Kafka and files are different because they retain the data and we can simply go back to where the operator left off. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: MLHR-1936 Adding NiFi operator...

Posted by DT-Priyanka <gi...@git.apache.org>.
Github user DT-Priyanka commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/133#discussion_r48225249
  
    --- Diff: contrib/src/main/java/com/datatorrent/contrib/nifi/AbstractNiFiOutputOperator.java ---
    @@ -0,0 +1,118 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.contrib.nifi;
    +
    +import java.io.IOException;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import org.apache.nifi.remote.Transaction;
    +import org.apache.nifi.remote.TransferDirection;
    +import org.apache.nifi.remote.client.SiteToSiteClient;
    +
    +import com.datatorrent.api.Context;
    +import com.datatorrent.common.util.BaseOperator;
    +import com.datatorrent.netlet.util.DTThrowable;
    +
    +/**
    + * This is the base implementation of a NiFi output operator.&nbsp;
    + * A concrete operator should be created from this skeleton implementation.
    + * <p>
    + * <br>
    + * Ports:<br>
    + * <b>Input</b>: Can have any number of input ports<br>
    + * <b>Output</b>: no output port<br>
    + * <br>
    + * Properties:<br>
    + * None<br>
    + * <br>
    + * Compile time checks:<br>
    + * None<br>
    + * <br>
    + * Run time checks:<br>
    + * None<br>
    + * <br>
    + * Benchmarks:<br>
    + * TBD<br>
    + * <br>
    + * </p>
    + *
    + * @displayName Abstract NiFi Output
    + * @category Messaging
    + * @tags output operator
    + *
    + * @since 3.3.0
    + *
    + */
    +public abstract class AbstractNiFiOutputOperator<T> extends BaseOperator
    +{
    +
    +  private static final Logger LOGGER = LoggerFactory.getLogger(AbstractNiFiOutputOperator.class);
    +
    +  protected SiteToSiteClient client;
    +  protected final SiteToSiteClient.Builder siteToSiteBuilder;
    +  protected final NiFiDataPacketBuilder<T> dataPacketBuilder;
    +
    +  /**
    +   * @param siteToSiteBuilder the builder for a NiFi SiteToSiteClient
    +   * @param dataPacketBuilder a builder to produce NiFiDataPackets from incoming data
    +   */
    +  public AbstractNiFiOutputOperator(final SiteToSiteClient.Builder siteToSiteBuilder,
    +      final NiFiDataPacketBuilder<T> dataPacketBuilder)
    +  {
    +    this.siteToSiteBuilder = siteToSiteBuilder;
    +    this.dataPacketBuilder = dataPacketBuilder;
    +  }
    +
    +  @Override
    +  public void setup(Context.OperatorContext context)
    +  {
    +    this.client = siteToSiteBuilder.build();
    +  }
    +
    +  @Override
    +  public void teardown()
    +  {
    +    try {
    +      client.close();
    +    } catch (IOException e) {
    +      LOGGER.error(e.getMessage(), e);
    +    }
    +  }
    +
    +  protected void processTuple(T tuple)
    +  {
    +    final NiFiDataPacket niFiDataPacket = dataPacketBuilder.createNiFiDataPacket(tuple);
    +
    +    try {
    +      final Transaction transaction = client.createTransaction(TransferDirection.SEND);
    --- End diff --
    
    If creating a transaction is costly operation we might create batch of tuples and send them in one transaction during "endWindow". Creating a transaction to send only one tuple is optimal?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: MLHR-1936 Adding NiFi operator...

Posted by tweise <gi...@git.apache.org>.
Github user tweise commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/133#discussion_r48077812
  
    --- Diff: contrib/src/main/java/com/datatorrent/contrib/nifi/AbstractNiFiInputOperator.java ---
    @@ -0,0 +1,162 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.contrib.nifi;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.List;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import org.apache.nifi.remote.Transaction;
    +import org.apache.nifi.remote.TransferDirection;
    +import org.apache.nifi.remote.client.SiteToSiteClient;
    +import org.apache.nifi.remote.protocol.DataPacket;
    +
    +import com.datatorrent.api.Context;
    +import com.datatorrent.api.InputOperator;
    +
    +/**
    + * This is the base implementation of a NiFi input operator.&nbsp;
    + * Subclasses should implement the methods which convert NiFi DataPackets to tuples and emit them.
    + * <p>
    + * Ports:<br>
    + * <b>Input</b>: No input port<br>
    + * <b>Output</b>: Can have any number of output ports<br>
    + * <br>
    + * Properties:<br>
    + * None<br>
    + * <br>
    + * Compile time checks:<br>
    + * Classes derived from this have to implement the abstract methods emitTuples(List<T> tuples)&nbsp;
    + * and createTuple(DataPacket dp)<br>
    + * <br>
    + * Run time checks:<br>
    + * None<br>
    + * <br>
    + * Benchmarks:<br>
    + * TBD<br>
    + * </p>
    + *
    + * @displayName Abstract NiFi Input
    + * @category Messaging
    + * @tags input operator
    + * @since 3.3.0
    + */
    +
    +public abstract class AbstractNiFiInputOperator<T> implements InputOperator
    +{
    +
    +  private static final Logger LOGGER = LoggerFactory.getLogger(AbstractNiFiInputOperator.class);
    +
    +  private SiteToSiteClient client;
    +  private final SiteToSiteClient.Builder siteToSiteBuilder;
    +
    +  /**
    +   * @param siteToSiteBuilder the builder for a NiFi SiteToSiteClient
    +   */
    +  public AbstractNiFiInputOperator(final SiteToSiteClient.Builder siteToSiteBuilder)
    +  {
    +    this.siteToSiteBuilder = siteToSiteBuilder;
    +  }
    +
    +  @Override
    +  public void setup(Context.OperatorContext context)
    +  {
    +    this.client = siteToSiteBuilder.build();
    +  }
    +
    +  @Override
    +  public void teardown()
    +  {
    +    try {
    +      client.close();
    +    } catch (IOException e) {
    +      LOGGER.error(e.getMessage(), e);
    +    }
    +  }
    +
    +  @Override
    +  public void emitTuples()
    +  {
    +    try {
    +      final Transaction transaction = client.createTransaction(TransferDirection.RECEIVE);
    +      if (transaction == null) {
    +        LOGGER.warn("A transaction could not be created, returning...");
    +        return;
    +      }
    +
    +      DataPacket dataPacket = transaction.receive();
    +      if (dataPacket == null) {
    +        transaction.confirm();
    +        transaction.complete();
    +        LOGGER.debug("No data available to pull, returning and will try again...");
    +        return;
    +      }
    +
    +      // read all of the available data packets and convert to the given type
    +      final List<T> tuples = new ArrayList<>();
    +      do {
    +        tuples.add(createTuple(dataPacket));
    +        dataPacket = transaction.receive();
    +      } while (dataPacket != null);
    +
    +      // confirm all of the expected data was received
    +      transaction.confirm();
    +
    +      // delegate to sub-classes to emit the tuples
    +      emitTuples(tuples);
    +
    +      // everything was successful so complete the transaction
    +      transaction.complete();
    +
    --- End diff --
    
    The JMS code is here:  https://github.com/apache/incubator-apex-malhar/blob/master/library/src/main/java/com/datatorrent/lib/io/jms/AbstractJMSInputOperator.java
    
    It also shows how data is managed by window for replay (you already asked about the window data manager). It's a write-ahead-log that on replay let's us retain the relationship between individual events and windows. That's to achieve idempotency. Downstream processing that is window based (vs. event by event) will require this. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: MLHR-1936 Adding NiFi operator...

Posted by gauravgopi123 <gi...@git.apache.org>.
Github user gauravgopi123 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/133#discussion_r48071757
  
    --- Diff: contrib/src/main/java/com/datatorrent/contrib/nifi/AbstractNiFiInputOperator.java ---
    @@ -0,0 +1,162 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.contrib.nifi;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.List;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import org.apache.nifi.remote.Transaction;
    +import org.apache.nifi.remote.TransferDirection;
    +import org.apache.nifi.remote.client.SiteToSiteClient;
    +import org.apache.nifi.remote.protocol.DataPacket;
    +
    +import com.datatorrent.api.Context;
    +import com.datatorrent.api.InputOperator;
    +
    +/**
    + * This is the base implementation of a NiFi input operator.&nbsp;
    + * Subclasses should implement the methods which convert NiFi DataPackets to tuples and emit them.
    + * <p>
    + * Ports:<br>
    + * <b>Input</b>: No input port<br>
    + * <b>Output</b>: Can have any number of output ports<br>
    + * <br>
    + * Properties:<br>
    + * None<br>
    + * <br>
    + * Compile time checks:<br>
    + * Classes derived from this have to implement the abstract methods emitTuples(List<T> tuples)&nbsp;
    + * and createTuple(DataPacket dp)<br>
    + * <br>
    + * Run time checks:<br>
    + * None<br>
    + * <br>
    + * Benchmarks:<br>
    + * TBD<br>
    + * </p>
    + *
    + * @displayName Abstract NiFi Input
    + * @category Messaging
    + * @tags input operator
    + * @since 3.3.0
    + */
    +
    +public abstract class AbstractNiFiInputOperator<T> implements InputOperator
    +{
    +
    +  private static final Logger LOGGER = LoggerFactory.getLogger(AbstractNiFiInputOperator.class);
    +
    +  private SiteToSiteClient client;
    --- End diff --
    
    This should be set transient as it is initialized in setup


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: MLHR-1936 Adding NiFi operator...

Posted by chinmaykolhatkar <gi...@git.apache.org>.
Github user chinmaykolhatkar commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/133#discussion_r49067569
  
    --- Diff: contrib/src/main/java/com/datatorrent/contrib/nifi/AbstractNiFiOutputOperator.java ---
    @@ -0,0 +1,118 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.contrib.nifi;
    +
    +import java.io.IOException;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import org.apache.nifi.remote.Transaction;
    +import org.apache.nifi.remote.TransferDirection;
    +import org.apache.nifi.remote.client.SiteToSiteClient;
    +
    +import com.datatorrent.api.Context;
    +import com.datatorrent.common.util.BaseOperator;
    +import com.datatorrent.netlet.util.DTThrowable;
    +
    +/**
    + * This is the base implementation of a NiFi output operator.&nbsp;
    + * A concrete operator should be created from this skeleton implementation.
    + * <p>
    + * <br>
    + * Ports:<br>
    + * <b>Input</b>: Can have any number of input ports<br>
    + * <b>Output</b>: no output port<br>
    + * <br>
    + * Properties:<br>
    + * None<br>
    + * <br>
    + * Compile time checks:<br>
    + * None<br>
    + * <br>
    + * Run time checks:<br>
    + * None<br>
    + * <br>
    + * Benchmarks:<br>
    + * TBD<br>
    + * <br>
    + * </p>
    + *
    + * @displayName Abstract NiFi Output
    + * @category Messaging
    + * @tags output operator
    + *
    + * @since 3.3.0
    + *
    + */
    +public abstract class AbstractNiFiOutputOperator<T> extends BaseOperator
    +{
    +
    +  private static final Logger LOGGER = LoggerFactory.getLogger(AbstractNiFiOutputOperator.class);
    +
    +  protected SiteToSiteClient client;
    --- End diff --
    
    Can you please make this transient as this gets initialized in setup.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---