You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2016/01/30 20:31:18 UTC

[1/2] incubator-apex-malhar git commit: MLHR-1936 Adding NiFi operators to contrib

Repository: incubator-apex-malhar
Updated Branches:
  refs/heads/devel-3 a44e81a83 -> 98495ab62


MLHR-1936 Adding NiFi operators to contrib


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/2cb84942
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/2cb84942
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/2cb84942

Branch: refs/heads/devel-3
Commit: 2cb849422560e53973decc1e72285ac3bda357f9
Parents: 9e77ef7
Author: Bryan Bende <bb...@apache.org>
Authored: Mon Dec 14 17:34:59 2015 -0500
Committer: Bryan Bende <bb...@apache.org>
Committed: Tue Jan 26 16:04:32 2016 -0500

----------------------------------------------------------------------
 contrib/pom.xml                                 |   6 +
 .../contrib/nifi/AbstractNiFiInputOperator.java | 212 +++++++++++++++++
 .../nifi/AbstractNiFiOutputOperator.java        | 187 +++++++++++++++
 .../AbstractNiFiSinglePortInputOperator.java    |  83 +++++++
 .../contrib/nifi/NiFiDataPacket.java            |  42 ++++
 .../contrib/nifi/NiFiDataPacketBuilder.java     |  33 +++
 .../nifi/NiFiSinglePortInputOperator.java       |  72 ++++++
 .../nifi/NiFiSinglePortOutputOperator.java      | 126 ++++++++++
 .../contrib/nifi/StandardNiFiDataPacket.java    |  59 +++++
 .../nifi/NiFiSinglePortInputOperatorTest.java   | 200 ++++++++++++++++
 .../nifi/NiFiSinglePortOutputOperatorTest.java  | 231 +++++++++++++++++++
 .../nifi/demo/TestNiFiInputApplication.java     |  63 +++++
 .../nifi/demo/TestNiFiOutputApplication.java    |  85 +++++++
 .../contrib/nifi/mock/MockDataPacket.java       |  58 +++++
 .../contrib/nifi/mock/MockSiteToSiteClient.java | 102 ++++++++
 .../contrib/nifi/mock/MockTransaction.java      | 166 +++++++++++++
 16 files changed, 1725 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/2cb84942/contrib/pom.xml
----------------------------------------------------------------------
diff --git a/contrib/pom.xml b/contrib/pom.xml
index 16e28c8..17b6008 100755
--- a/contrib/pom.xml
+++ b/contrib/pom.xml
@@ -220,6 +220,12 @@
 
   <dependencies>
     <dependency>
+      <groupId>org.apache.nifi</groupId>
+      <artifactId>nifi-site-to-site-client</artifactId>
+      <version>0.4.1</version>
+      <optional>true</optional>
+    </dependency>
+    <dependency>
       <groupId>org.codehaus.janino</groupId>
       <artifactId>janino</artifactId>
       <version>2.7.8</version>

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/2cb84942/contrib/src/main/java/com/datatorrent/contrib/nifi/AbstractNiFiInputOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/nifi/AbstractNiFiInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/nifi/AbstractNiFiInputOperator.java
new file mode 100644
index 0000000..d0130f6
--- /dev/null
+++ b/contrib/src/main/java/com/datatorrent/contrib/nifi/AbstractNiFiInputOperator.java
@@ -0,0 +1,212 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.contrib.nifi;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.nifi.remote.Transaction;
+import org.apache.nifi.remote.TransferDirection;
+import org.apache.nifi.remote.client.SiteToSiteClient;
+import org.apache.nifi.remote.protocol.DataPacket;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.lib.util.WindowDataManager;
+import com.datatorrent.netlet.util.DTThrowable;
+
+/**
+ * This is the base implementation of a NiFi input operator.&nbsp;
+ * Subclasses should implement the methods which convert NiFi DataPackets to tuples and emit them.
+ * <p>
+ * Ports:<br>
+ * <b>Input</b>: No input port<br>
+ * <b>Output</b>: Can have any number of output ports<br>
+ * <br>
+ * Properties:<br>
+ * None<br>
+ * <br>
+ * Compile time checks:<br>
+ * Classes derived from this have to implement the abstract methods emitTuples(List<T> tuples)&nbsp;
+ * and createTuple(DataPacket dp)<br>
+ * <br>
+ * Run time checks:<br>
+ * None<br>
+ * <br>
+ * Benchmarks:<br>
+ * TBD<br>
+ * </p>
+ *
+ * @displayName Abstract NiFi Input
+ * @category Messaging
+ * @tags input operator
+ */
+
+public abstract class AbstractNiFiInputOperator<T> implements InputOperator
+{
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(AbstractNiFiInputOperator.class);
+
+  private transient SiteToSiteClient client;
+  private final SiteToSiteClient.Builder siteToSiteBuilder;
+
+  private transient int operatorContextId;
+  private transient long currentWindowId;
+  private transient List<T> currentWindowTuples;
+  private transient List<T> recoveredTuples;
+  private final WindowDataManager windowDataManager;
+
+  /**
+   * @param siteToSiteBuilder the builder for a NiFi SiteToSiteClient
+   * @param windowDataManager a WindowDataManager to save and load state for windows of tuples
+   */
+  public AbstractNiFiInputOperator(final SiteToSiteClient.Builder siteToSiteBuilder,
+      final WindowDataManager windowDataManager)
+  {
+    this.siteToSiteBuilder = siteToSiteBuilder;
+    this.windowDataManager = windowDataManager;
+  }
+
+  @Override
+  public void setup(Context.OperatorContext context)
+  {
+    this.client = siteToSiteBuilder.build();
+    this.operatorContextId = context.getId();
+    this.currentWindowTuples = new ArrayList<>();
+    this.recoveredTuples = new ArrayList<>();
+    this.windowDataManager.setup(context);
+  }
+
+  @Override
+  public void beginWindow(long windowId)
+  {
+    currentWindowId = windowId;
+
+    // if the current window is now less than the largest window, then we need to replay data
+    if (currentWindowId <= windowDataManager.getLargestRecoveryWindow()) {
+      try {
+        List<T> recoveredData =  (List<T>)this.windowDataManager.load(operatorContextId, windowId);
+        if (recoveredData == null) {
+          return;
+        }
+
+        // if we recovered tuples then load them to be processed by next call to emitTuples()
+        recoveredTuples.addAll(recoveredData);
+      } catch (IOException e) {
+        DTThrowable.rethrow(e);
+      }
+    }
+  }
+
+  @Override
+  public void emitTuples()
+  {
+    // if we have recovered tuples we must be replaying a previous window so emit them,
+    // clear the recovered list, and return until we have no more recovered data
+    if (recoveredTuples.size() > 0) {
+      emitTuples(recoveredTuples);
+      recoveredTuples.clear();
+      return;
+    }
+
+    // no recovered data so start a transaction and pull new data
+    try {
+      final Transaction transaction = client.createTransaction(TransferDirection.RECEIVE);
+      if (transaction == null) {
+        LOGGER.warn("A transaction could not be created, returning...");
+        return;
+      }
+
+      DataPacket dataPacket = transaction.receive();
+      if (dataPacket == null) {
+        transaction.confirm();
+        transaction.complete();
+        LOGGER.debug("No data available to pull, returning and will try again...");
+        return;
+      }
+
+      // read all of the available data packets and convert to the given type
+      final List<T> tuples = new ArrayList<>();
+      do {
+        tuples.add(createTuple(dataPacket));
+        dataPacket = transaction.receive();
+      } while (dataPacket != null);
+
+      // confirm all of the expected data was received by comparing check-sums, does not complete the transaction
+      transaction.confirm();
+
+      // ensure we have the data saved before proceeding in case anything goes wrong
+      currentWindowTuples.addAll(tuples);
+      windowDataManager.save(currentWindowTuples, operatorContextId, currentWindowId);
+
+      // we now have the data saved so we can complete the transaction
+      transaction.complete();
+
+      // delegate to sub-classes to emit the tuples
+      emitTuples(tuples);
+
+    } catch (IOException e) {
+      DTThrowable.rethrow(e);
+    }
+  }
+
+  /**
+   * Provides mechanism for converting a DataPacket to the given type.
+   *
+   * @param dataPacket a DataPacket from the NiFi Site-To-Site client.
+   * @return the given type of tuple
+   */
+  protected abstract T createTuple(final DataPacket dataPacket) throws IOException;
+
+  /**
+   * Provided mechanism to emit the list of tuples for follow-on processing.
+   *
+   * @param tuples a list of tuples received from NiFi.
+   */
+  protected abstract void emitTuples(final List<T> tuples);
+
+  @Override
+  public void endWindow()
+  {
+    // save the final state of the window and clear the current window list
+    try {
+      windowDataManager.save(currentWindowTuples, operatorContextId, currentWindowId);
+    } catch (IOException e) {
+      DTThrowable.rethrow(e);
+    }
+    currentWindowTuples.clear();
+  }
+
+  @Override
+  public void teardown()
+  {
+    LOGGER.debug("Tearing down operator...");
+    windowDataManager.teardown();
+    try {
+      client.close();
+    } catch (IOException e) {
+      throw new RuntimeException("Error closing SiteToSiteClient", e);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/2cb84942/contrib/src/main/java/com/datatorrent/contrib/nifi/AbstractNiFiOutputOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/nifi/AbstractNiFiOutputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/nifi/AbstractNiFiOutputOperator.java
new file mode 100644
index 0000000..e4e61fb
--- /dev/null
+++ b/contrib/src/main/java/com/datatorrent/contrib/nifi/AbstractNiFiOutputOperator.java
@@ -0,0 +1,187 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.contrib.nifi;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.nifi.remote.Transaction;
+import org.apache.nifi.remote.TransferDirection;
+import org.apache.nifi.remote.client.SiteToSiteClient;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.lib.util.WindowDataManager;
+import com.datatorrent.netlet.util.DTThrowable;
+
+/**
+ * This is the base implementation of a NiFi output operator.&nbsp;
+ * A concrete operator should be created from this skeleton implementation.
+ * <p>
+ * <br>
+ * Ports:<br>
+ * <b>Input</b>: Can have any number of input ports<br>
+ * <b>Output</b>: no output port<br>
+ * <br>
+ * Properties:<br>
+ * None<br>
+ * <br>
+ * Compile time checks:<br>
+ * None<br>
+ * <br>
+ * Run time checks:<br>
+ * None<br>
+ * <br>
+ * Benchmarks:<br>
+ * TBD<br>
+ * <br>
+ * </p>
+ *
+ * @displayName Abstract NiFi Output
+ * @category Messaging
+ * @tags output operator
+ *
+ */
+public abstract class AbstractNiFiOutputOperator<T> extends BaseOperator
+{
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(AbstractNiFiOutputOperator.class);
+
+  protected final SiteToSiteClient.Builder siteToSiteBuilder;
+  protected final NiFiDataPacketBuilder<T> dataPacketBuilder;
+  protected final WindowDataManager windowDataManager;
+
+  protected transient SiteToSiteClient client;
+
+  private transient int operatorContextId;
+  private transient long currentWindowId;
+  private transient long largestRecoveryWindowId;
+  protected transient boolean skipProcessingTuple = false;
+
+  /**
+   * @param siteToSiteBuilder the builder for a NiFi SiteToSiteClient
+   * @param dataPacketBuilder a builder to produce NiFiDataPackets from incoming data
+   * @param windowDataManager  a WindowDataManager to save and load state for windows of tuples
+   */
+  public AbstractNiFiOutputOperator(final SiteToSiteClient.Builder siteToSiteBuilder,
+      final NiFiDataPacketBuilder<T> dataPacketBuilder, final WindowDataManager windowDataManager)
+  {
+    this.siteToSiteBuilder = siteToSiteBuilder;
+    this.dataPacketBuilder = dataPacketBuilder;
+    this.windowDataManager = windowDataManager;
+  }
+
+  @Override
+  public void setup(Context.OperatorContext context)
+  {
+    this.client = siteToSiteBuilder.build();
+    this.operatorContextId = context.getId();
+    this.windowDataManager.setup(context);
+  }
+
+  @Override
+  public void beginWindow(long windowId)
+  {
+    currentWindowId = windowId;
+    largestRecoveryWindowId = windowDataManager.getLargestRecoveryWindow();
+
+    // if processing a window we've already seen, don't resend the tuples
+    if (currentWindowId <= largestRecoveryWindowId) {
+      skipProcessingTuple = true;
+    } else {
+      skipProcessingTuple = false;
+    }
+  }
+
+  @Override
+  public void endWindow()
+  {
+    // if replaying then nothing to do
+    if (currentWindowId <= largestRecoveryWindowId) {
+      return;
+    }
+
+    // if processing a new window then give sub-classes a chance to take action
+    endNewWindow();
+
+    // mark that we processed the window
+    try {
+      windowDataManager.save("processedWindow", operatorContextId, currentWindowId);
+    } catch (IOException e) {
+      DTThrowable.rethrow(e);
+    }
+  }
+
+  /**
+   * Called in endWindow() to give sub-classes a chance to take action when processing a new window.
+   *
+   * If the current window is <= the largest recovery window, this method will never be called.
+   */
+  protected abstract void endNewWindow();
+
+
+  @Override
+  public void teardown()
+  {
+    LOGGER.debug("Tearing down operator...");
+    windowDataManager.teardown();
+    try {
+      client.close();
+    } catch (IOException e) {
+      DTThrowable.rethrow(e);
+    }
+  }
+
+  /**
+   * Send the given batch of tuples to NiFi in a transaction, using the provided builder to
+   * first convert each tuple into a NiFiDataPacket.
+   *
+   * @param tuples a list of tuples to process
+   */
+  protected void processTuples(List<T> tuples)
+  {
+    if (tuples == null || tuples.size() == 0) {
+      return;
+    }
+
+    // create a transaction and send the data packets
+    try {
+      final Transaction transaction = client.createTransaction(TransferDirection.SEND);
+      if (transaction == null) {
+        throw new IllegalStateException("Unable to create a NiFi Transaction to send data");
+      }
+
+      // convert each tuple to a NiFiDataPacket using the provided builder
+      for (T tuple : tuples) {
+        NiFiDataPacket dp = dataPacketBuilder.createNiFiDataPacket(tuple);
+        transaction.send(dp.getContent(), dp.getAttributes());
+      }
+
+      transaction.confirm();
+      transaction.complete();
+    } catch (IOException ioe) {
+      DTThrowable.rethrow(ioe);
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/2cb84942/contrib/src/main/java/com/datatorrent/contrib/nifi/AbstractNiFiSinglePortInputOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/nifi/AbstractNiFiSinglePortInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/nifi/AbstractNiFiSinglePortInputOperator.java
new file mode 100644
index 0000000..d874be0
--- /dev/null
+++ b/contrib/src/main/java/com/datatorrent/contrib/nifi/AbstractNiFiSinglePortInputOperator.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.contrib.nifi;
+
+import java.util.List;
+
+import org.apache.nifi.remote.client.SiteToSiteClient;
+
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.lib.util.WindowDataManager;
+
+/**
+ * This is the base implementation of a NiFi input operator with a single output port.&nbsp;
+ * Subclasses should implement the methods which convert NiFi DataPackets to tuples.
+ * <p>
+ * <br>
+ * Ports:<br>
+ * <b>Input</b>: No input port<br>
+ * <b>Output</b>: Have only one output port<br>
+ * <br>
+ * Properties:<br>
+ * None<br>
+ * <br>
+ * Compile time checks:<br>
+ * Class derived from this has to implement the abstract method createTuple(DataPacket dp) <br>
+ * <br>
+ * Run time checks:<br>
+ * None<br>
+ * <br>
+ * Benchmarks:<br>
+ * TBD<br>
+ * <br>
+ * </p>
+ *
+ * @displayName Abstract NiFi Single Port Input
+ * @category Messaging
+ * @tags input operator
+ *
+ */
+public abstract class AbstractNiFiSinglePortInputOperator<T> extends AbstractNiFiInputOperator<T>
+{
+
+  /**
+   * This is the output port on which tuples extracted from NiFi data packets are emitted.
+   */
+  public final transient DefaultOutputPort<T> outputPort = new DefaultOutputPort<>();
+
+  /**
+   *
+   * @param siteToSiteBuilder the builder for a NiFi SiteToSiteClient
+   * @param windowDataManager a WindowDataManager to save and load state for windows of tuples
+   */
+  public AbstractNiFiSinglePortInputOperator(final SiteToSiteClient.Builder siteToSiteBuilder,
+      final WindowDataManager windowDataManager)
+  {
+    super(siteToSiteBuilder, windowDataManager);
+  }
+
+  @Override
+  protected void emitTuples(final List<T> tuples)
+  {
+    for (T tuple : tuples) {
+      outputPort.emit(tuple);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/2cb84942/contrib/src/main/java/com/datatorrent/contrib/nifi/NiFiDataPacket.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/nifi/NiFiDataPacket.java b/contrib/src/main/java/com/datatorrent/contrib/nifi/NiFiDataPacket.java
new file mode 100644
index 0000000..9c66056
--- /dev/null
+++ b/contrib/src/main/java/com/datatorrent/contrib/nifi/NiFiDataPacket.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.contrib.nifi;
+
+import java.util.Map;
+
+/**
+ * <p>
+ * The NiFiDataPacket provides a packaging around a NiFi FlowFile. It wraps both
+ * a FlowFile's content and its attributes so that they can be processed by Apex.
+ * </p>
+ */
+public interface NiFiDataPacket
+{
+
+  /**
+   * @return the contents of a NiFi FlowFile
+   */
+  byte[] getContent();
+
+  /**
+   * @return a Map of attributes that are associated with the NiFi FlowFile
+   */
+  Map<String, String> getAttributes();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/2cb84942/contrib/src/main/java/com/datatorrent/contrib/nifi/NiFiDataPacketBuilder.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/nifi/NiFiDataPacketBuilder.java b/contrib/src/main/java/com/datatorrent/contrib/nifi/NiFiDataPacketBuilder.java
new file mode 100644
index 0000000..4b71792
--- /dev/null
+++ b/contrib/src/main/java/com/datatorrent/contrib/nifi/NiFiDataPacketBuilder.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.contrib.nifi;
+
+import java.io.Serializable;
+
+/**
+ * Creates a NiFiDataPacket from an incoming instance of the given type.
+ *
+ * @param <T> the type that a NiFiDataPacket is being created from
+ */
+public interface NiFiDataPacketBuilder<T> extends Serializable
+{
+
+  NiFiDataPacket createNiFiDataPacket(T t);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/2cb84942/contrib/src/main/java/com/datatorrent/contrib/nifi/NiFiSinglePortInputOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/nifi/NiFiSinglePortInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/nifi/NiFiSinglePortInputOperator.java
new file mode 100644
index 0000000..f80386d
--- /dev/null
+++ b/contrib/src/main/java/com/datatorrent/contrib/nifi/NiFiSinglePortInputOperator.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.contrib.nifi;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Map;
+
+import org.apache.nifi.remote.client.SiteToSiteClient;
+import org.apache.nifi.remote.protocol.DataPacket;
+import org.apache.nifi.stream.io.StreamUtils;
+
+import com.datatorrent.lib.util.WindowDataManager;
+
+/**
+ * Input adapter operator which consumes data from NiFi and produces NiFiDataPackets
+ * where each NiFiDataPacket contains a byte array of content and a Map of attributes.
+ *
+ * @displayName NiFi Input Operator
+ * @category Messaging
+ * @tags input operator
+ *
+ */
+public class NiFiSinglePortInputOperator extends AbstractNiFiSinglePortInputOperator<NiFiDataPacket>
+{
+
+  // required by Kyro serialization
+  private NiFiSinglePortInputOperator()
+  {
+    super(null, null);
+  }
+
+  /**
+   *
+   * @param siteToSiteBuilder the builder for a NiFi SiteToSiteClient
+   * @param windowDataManager a WindowDataManager to save and load state for windows of tuples
+   */
+  public NiFiSinglePortInputOperator(final SiteToSiteClient.Builder siteToSiteBuilder,
+      final WindowDataManager windowDataManager)
+  {
+    super(siteToSiteBuilder, windowDataManager);
+  }
+
+  @Override
+  protected NiFiDataPacket createTuple(final DataPacket dataPacket) throws IOException
+  {
+    // read the data into a byte array and wrap it with the attributes into a NiFiDataPacket
+    final InputStream inStream = dataPacket.getData();
+    final byte[] data = new byte[(int)dataPacket.getSize()];
+    StreamUtils.fillBuffer(inStream, data);
+
+    final Map<String, String> attributes = dataPacket.getAttributes();
+    return new StandardNiFiDataPacket(data, attributes);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/2cb84942/contrib/src/main/java/com/datatorrent/contrib/nifi/NiFiSinglePortOutputOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/nifi/NiFiSinglePortOutputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/nifi/NiFiSinglePortOutputOperator.java
new file mode 100644
index 0000000..2692034
--- /dev/null
+++ b/contrib/src/main/java/com/datatorrent/contrib/nifi/NiFiSinglePortOutputOperator.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.contrib.nifi;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.nifi.remote.client.SiteToSiteClient;
+
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.lib.util.WindowDataManager;
+
+/**
+ * NiFi output adapter operator with a single input port. Clients should provide a NiFiDataPacketBuilder implementation
+ * for converting incoming tuples to NiFiDataPackets.
+ * <p>
+ * <br>
+ * Ports:<br>
+ * <b>Input</b>: Have only one input port<br>
+ * <b>Output</b>: No output port<br>
+ * <br>
+ * Properties:<br>
+ * None<br>
+ * <br>
+ * Compile time checks:<br>
+ * None<br>
+ * <br>
+ * Run time checks:<br>
+ * None<br>
+ * <br>
+ * Benchmarks:<br>
+ * TBD<br>
+ * <br>
+ * </p>
+ *
+ * @displayName NiFi Single Port Output
+ * @category Messaging
+ * @tags output operator
+ *
+ */
+public class NiFiSinglePortOutputOperator<T> extends AbstractNiFiOutputOperator<T>
+{
+
+  public final transient BufferingInputPort inputPort;
+
+  // required by Kyro serialization
+  private NiFiSinglePortOutputOperator()
+  {
+    this(null, null, null, 0);
+  }
+
+  /**
+   * @param siteToSiteBuilder the builder for a NiFi SiteToSiteClient
+   * @param dataPacketBuilder a builder to produce NiFiDataPackets from incoming data
+   * @param windowDataManager  a WindowDataManager to save and load state for windows of tuples
+   * @param batchSize the maximum number of tuples to send to NiFi in a single transaction
+   */
+  public NiFiSinglePortOutputOperator(
+      final SiteToSiteClient.Builder siteToSiteBuilder,
+      final NiFiDataPacketBuilder<T> dataPacketBuilder,
+      final WindowDataManager windowDataManager,
+      final int batchSize)
+  {
+    super(siteToSiteBuilder, dataPacketBuilder, windowDataManager);
+    this.inputPort = new BufferingInputPort(batchSize);
+  }
+
+  @Override
+  protected void endNewWindow()
+  {
+    // flush any tuples that may have been buffered between the last flush and endWindow()
+    inputPort.flush();
+  }
+
+  /**
+   * An InputPort that accumulates tuples up to the provided batch size before flushing.
+   */
+  public class BufferingInputPort extends DefaultInputPort<T>
+  {
+
+    private final int batchSize;
+    private final List<T> tuples;
+
+    public BufferingInputPort(final int batchSize)
+    {
+      this.tuples = new ArrayList<>();
+      this.batchSize = batchSize;
+    }
+
+    @Override
+    public void process(T tuple)
+    {
+      if (!skipProcessingTuple) {
+        tuples.add(tuple);
+
+        if (tuples.size() >= batchSize) {
+          flush();
+        }
+      }
+    }
+
+    public void flush()
+    {
+      processTuples(tuples);
+      tuples.clear();
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/2cb84942/contrib/src/main/java/com/datatorrent/contrib/nifi/StandardNiFiDataPacket.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/nifi/StandardNiFiDataPacket.java b/contrib/src/main/java/com/datatorrent/contrib/nifi/StandardNiFiDataPacket.java
new file mode 100644
index 0000000..6821adf
--- /dev/null
+++ b/contrib/src/main/java/com/datatorrent/contrib/nifi/StandardNiFiDataPacket.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.contrib.nifi;
+
+import java.io.Serializable;
+import java.util.Map;
+
+/**
+ * An implementation of NiFiDataPacket.
+ */
+public class StandardNiFiDataPacket implements NiFiDataPacket, Serializable
+{
+  private static final long serialVersionUID = 6364005260220243322L;
+
+  private final byte[] content;
+  private final Map<String, String> attributes;
+
+  // required by Kyro serialization
+  public StandardNiFiDataPacket()
+  {
+    this.content = null;
+    this.attributes = null;
+  }
+
+  public StandardNiFiDataPacket(final byte[] content, final Map<String, String> attributes)
+  {
+    this.content = content;
+    this.attributes = attributes;
+  }
+
+  @Override
+  public byte[] getContent()
+  {
+    return content;
+  }
+
+  @Override
+  public Map<String, String> getAttributes()
+  {
+    return attributes;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/2cb84942/contrib/src/test/java/com/datatorrent/contrib/nifi/NiFiSinglePortInputOperatorTest.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/com/datatorrent/contrib/nifi/NiFiSinglePortInputOperatorTest.java b/contrib/src/test/java/com/datatorrent/contrib/nifi/NiFiSinglePortInputOperatorTest.java
new file mode 100644
index 0000000..a25497b
--- /dev/null
+++ b/contrib/src/test/java/com/datatorrent/contrib/nifi/NiFiSinglePortInputOperatorTest.java
@@ -0,0 +1,200 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.contrib.nifi;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.jetbrains.annotations.NotNull;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.nifi.remote.client.SiteToSiteClient;
+import org.apache.nifi.remote.protocol.DataPacket;
+import org.apache.nifi.stream.io.ByteArrayInputStream;
+import org.apache.nifi.util.file.FileUtils;
+
+import com.datatorrent.api.Attribute;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.contrib.nifi.mock.MockDataPacket;
+import com.datatorrent.contrib.nifi.mock.MockSiteToSiteClient;
+import com.datatorrent.lib.helper.OperatorContextTestHelper;
+import com.datatorrent.lib.testbench.CollectorTestSink;
+import com.datatorrent.lib.util.WindowDataManager;
+
+public class NiFiSinglePortInputOperatorTest
+{
+
+  private MockSiteToSiteClient.Builder builder;
+  private CollectorTestSink<Object> sink;
+  private Context.OperatorContext context;
+  private WindowDataManager windowDataManager;
+  private NiFiSinglePortInputOperator operator;
+
+  @Before
+  public void setup() throws IOException
+  {
+    final String windowDataDir = "target/" + this.getClass().getSimpleName();
+    final File windowDataDirFile = new File(windowDataDir);
+    if (windowDataDirFile.exists()) {
+      FileUtils.deleteFile(windowDataDirFile, true);
+    }
+
+    Attribute.AttributeMap attributeMap = new Attribute.AttributeMap.DefaultAttributeMap();
+    attributeMap.put(DAG.APPLICATION_PATH, windowDataDir);
+
+    context = new OperatorContextTestHelper.TestIdOperatorContext(12345, attributeMap);
+
+    sink = new CollectorTestSink<>();
+    builder = new MockSiteToSiteClient.Builder();
+    windowDataManager = new WindowDataManager.FSWindowDataManager();
+
+    operator = new NiFiSinglePortInputOperator(builder, windowDataManager);
+    operator.outputPort.setSink(sink);
+  }
+
+  @After
+  public void teardown() {
+    if (operator != null) {
+      operator.teardown();
+    }
+  }
+
+  @Test
+  public void testSimpleInput() throws IOException
+  {
+    // create some mock packets and queue them in the builder before running the operator
+    final List<DataPacket> dataPackets = getDataPackets(4);
+    builder.queue(dataPackets);
+
+    operator.setup(context);
+    operator.beginWindow(1);
+    operator.emitTuples();
+    operator.endWindow();
+
+    Assert.assertEquals("Size of collected tuples should equal size of mock data packets",
+        dataPackets.size(), sink.collectedTuples.size());
+
+    operator.beginWindow(2);
+    operator.emitTuples();
+    operator.endWindow();
+
+    Assert.assertEquals("Size of collected tuples should still equal size of mock data packets",
+        dataPackets.size(), sink.collectedTuples.size());
+
+    // verify that the collector sink got all the expected content
+    List<String> expectedContents = Arrays.asList("content1", "content2", "content3", "content4");
+    verifyContents(expectedContents, sink.collectedTuples);
+
+    // reinitialize the data manager so it picks up the saved data
+    windowDataManager.setup(context);
+
+    // verify that all the data packets were saved for window #1
+    List<StandardNiFiDataPacket> windowData = (List<StandardNiFiDataPacket>) windowDataManager.load(context.getId(), 1);
+    Assert.assertNotNull("Should have recovered data", windowData);
+    Assert.assertEquals("Size of recovered data should equal size of mock data packets",
+        dataPackets.size(), windowData.size());
+  }
+
+  @Test
+  public void testRecoveryAndIdempotency()
+  {
+    // create some mock packets and queue them in the builder before running the operator
+    final List<DataPacket> dataPackets = getDataPackets(4);
+    builder.queue(dataPackets);
+
+    operator.setup(context);
+    operator.beginWindow(1);
+    operator.emitTuples();
+    operator.endWindow();
+
+    Assert.assertEquals("Size of collected tuples should equal size of mock data packets",
+        dataPackets.size(), sink.collectedTuples.size());
+
+    // simulate failure and then re-deployment of operator
+
+    sink.collectedTuples.clear();
+    Assert.assertEquals("Should not have collected tuples", 0, sink.collectedTuples.size());
+
+    operator.setup(context);
+    operator.beginWindow(1);
+    operator.emitTuples();
+    operator.endWindow();
+
+    Assert.assertEquals("Size of collected tuples should equal size of mock data packets",
+        dataPackets.size(), sink.collectedTuples.size());
+  }
+
+  @NotNull
+  private List<DataPacket> getDataPackets(int size)
+  {
+    List<DataPacket> dataPackets = new ArrayList<>();
+
+    for (int i=1; i <= size; i++) {
+      dataPackets.add(getDataPacket(String.valueOf(i)));
+    }
+    return dataPackets;
+  }
+
+  @NotNull
+  private DataPacket getDataPacket(final String id)
+  {
+    Map<String, String> attrs = new HashMap<>();
+    attrs.put("keyA", "valA");
+    attrs.put("keyB", "valB");
+    attrs.put("key" + id, "val" + id);
+
+    byte[] content = ("content" + id).getBytes(StandardCharsets.UTF_8);
+    ByteArrayInputStream in = new ByteArrayInputStream(content);
+
+    return new MockDataPacket(attrs, in, content.length);
+  }
+
+  private void verifyContents(List<String> expectedContents, List<Object> tuples)
+  {
+    for (String expectedContent : expectedContents) {
+      boolean found = false;
+
+      for (Object obj : tuples) {
+        if (obj instanceof NiFiDataPacket) {
+          NiFiDataPacket dp = (NiFiDataPacket)obj;
+          Assert.assertEquals(3, dp.getAttributes().size());
+
+          String content = new String(dp.getContent(), StandardCharsets.UTF_8);
+          if (content.equals(expectedContent)) {
+            found = true;
+            break;
+          }
+        }
+      }
+
+      Assert.assertTrue(found);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/2cb84942/contrib/src/test/java/com/datatorrent/contrib/nifi/NiFiSinglePortOutputOperatorTest.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/com/datatorrent/contrib/nifi/NiFiSinglePortOutputOperatorTest.java b/contrib/src/test/java/com/datatorrent/contrib/nifi/NiFiSinglePortOutputOperatorTest.java
new file mode 100644
index 0000000..5b58ae0
--- /dev/null
+++ b/contrib/src/test/java/com/datatorrent/contrib/nifi/NiFiSinglePortOutputOperatorTest.java
@@ -0,0 +1,231 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.contrib.nifi;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.nifi.remote.protocol.DataPacket;
+import org.apache.nifi.util.file.FileUtils;
+
+import com.datatorrent.api.Attribute;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.contrib.nifi.mock.MockSiteToSiteClient;
+import com.datatorrent.contrib.nifi.mock.MockTransaction;
+import com.datatorrent.lib.helper.OperatorContextTestHelper;
+import com.datatorrent.lib.testbench.CollectorTestSink;
+import com.datatorrent.lib.util.WindowDataManager;
+
+public class NiFiSinglePortOutputOperatorTest
+{
+
+  private Context.OperatorContext context;
+  private WindowDataManager windowDataManager;
+  private MockSiteToSiteClient.Builder stsBuilder;
+  private NiFiDataPacketBuilder<String> dpBuilder;
+  private NiFiSinglePortOutputOperator<String> operator;
+
+  @Before
+  public void setup() throws IOException
+  {
+    final String windowDataDir = "target/" + this.getClass().getSimpleName();
+    final File windowDataDirFile = new File(windowDataDir);
+    if (windowDataDirFile.exists()) {
+      FileUtils.deleteFile(windowDataDirFile, true);
+    }
+
+    Attribute.AttributeMap attributeMap = new Attribute.AttributeMap.DefaultAttributeMap();
+    attributeMap.put(DAG.APPLICATION_PATH, windowDataDir);
+
+    context = new OperatorContextTestHelper.TestIdOperatorContext(12345, attributeMap);
+
+    windowDataManager = new WindowDataManager.FSWindowDataManager();
+
+    stsBuilder = new MockSiteToSiteClient.Builder();
+    dpBuilder = new StringNiFiDataPacketBuilder();
+    operator = new NiFiSinglePortOutputOperator(stsBuilder, dpBuilder, windowDataManager, 1);
+  }
+
+  @Test
+  public void testTransactionPerTuple() throws IOException
+  {
+    operator.setup(context);
+
+    // get the mock client which will capture each transactions
+    final MockSiteToSiteClient mockClient = (MockSiteToSiteClient)operator.client;
+
+    final String tuple1 = "tuple1";
+    final String tuple2 = "tuple2";
+    final String tuple3 = "tuple3";
+
+    operator.beginWindow(1);
+
+    operator.inputPort.process(tuple1);
+    Assert.assertEquals(1, mockClient.getMockTransactions().size());
+
+    operator.inputPort.process(tuple2);
+    Assert.assertEquals(2, mockClient.getMockTransactions().size());
+
+    operator.inputPort.process(tuple3);
+    Assert.assertEquals(3, mockClient.getMockTransactions().size());
+
+    operator.endNewWindow();
+    Assert.assertEquals(3, mockClient.getMockTransactions().size());
+
+    // verify we sent the correct content
+    List<String> expectedContents = Arrays.asList(tuple1, tuple2, tuple3);
+    List<MockTransaction> transactions = mockClient.getMockTransactions();
+
+    verifyTransactions(expectedContents, transactions);
+  }
+
+  @Test
+  public void testBatchSize() throws IOException
+  {
+    final int batchSize = 3;
+    operator = new NiFiSinglePortOutputOperator(stsBuilder, dpBuilder, windowDataManager, batchSize);
+    operator.setup(context);
+
+    // get the mock client which will capture each transactions
+    final MockSiteToSiteClient mockClient = (MockSiteToSiteClient)operator.client;
+
+    final String tuple1 = "tuple1";
+    final String tuple2 = "tuple2";
+    final String tuple3 = "tuple3";
+    final String tuple4 = "tuple4";
+    final String tuple5 = "tuple5";
+
+    operator.beginWindow(1);
+
+    operator.inputPort.process(tuple1);
+    Assert.assertEquals(0, mockClient.getMockTransactions().size());
+
+    operator.inputPort.process(tuple2);
+    Assert.assertEquals(0, mockClient.getMockTransactions().size());
+
+    // should cause the port to flush and create a transaction
+    operator.inputPort.process(tuple3);
+    Assert.assertEquals(1, mockClient.getMockTransactions().size());
+
+    operator.inputPort.process(tuple4);
+    Assert.assertEquals(1, mockClient.getMockTransactions().size());
+
+    operator.inputPort.process(tuple5);
+    Assert.assertEquals(1, mockClient.getMockTransactions().size());
+
+    // should flush tuples 4 and 5 and cause a new transaction
+    operator.endNewWindow();
+    Assert.assertEquals(2, mockClient.getMockTransactions().size());
+
+    // verify we sent the correct content
+    List<String> expectedContents = Arrays.asList(tuple1, tuple2, tuple3, tuple4, tuple5);
+    List<MockTransaction> transactions = mockClient.getMockTransactions();
+
+    verifyTransactions(expectedContents, transactions);
+  }
+
+  @Test
+  public void testReplay() throws IOException
+  {
+    final String tuple1 = "tuple1";
+    final String tuple2 = "tuple2";
+    final String tuple3 = "tuple3";
+
+    operator.setup(context);
+    operator.beginWindow(1);
+    operator.inputPort.process(tuple1);
+    operator.inputPort.process(tuple2);
+    operator.inputPort.process(tuple3);
+    operator.endWindow();
+
+    // get the mock client which will capture each transactions
+    MockSiteToSiteClient mockClient = (MockSiteToSiteClient)operator.client;
+    Assert.assertEquals(3, mockClient.getMockTransactions().size());
+
+    // simulate replaying window #1
+    operator.setup(context);
+    operator.beginWindow(1);
+    operator.inputPort.process(tuple1);
+    operator.inputPort.process(tuple2);
+    operator.inputPort.process(tuple3);
+    operator.endWindow();
+
+    // should not have created any transactions on the new client
+    mockClient = (MockSiteToSiteClient)operator.client;
+    Assert.assertEquals(0, mockClient.getMockTransactions().size());
+  }
+
+
+  private void verifyTransactions(List<String> expectedContents, List<MockTransaction> transactions) throws IOException
+  {
+    // convert all the data packets in the transactions to strings
+    final List<String> dataPacketContents = new ArrayList<>();
+
+    for (MockTransaction mockTransaction : transactions)
+    {
+      List<DataPacket> dps = mockTransaction.getSentDataPackets();
+      Assert.assertTrue(dps.size() > 0);
+
+      for (DataPacket dp : dps)
+      {
+        final String dpContent = IOUtils.toString(dp.getData());
+        dataPacketContents.add(dpContent);
+      }
+    }
+
+    // verify each expected piece of content is found in the data packet contents
+    for (String expectedContent : expectedContents)
+    {
+      boolean found = false;
+      for (String dataPacket : dataPacketContents)
+      {
+          if (dataPacket.equals(expectedContent))
+          {
+            found = true;
+            break;
+          }
+      }
+      Assert.assertTrue(found);
+    }
+  }
+
+  /**
+   * A builder that can create a NiFiDataPacket from a string.
+   */
+  public static class StringNiFiDataPacketBuilder implements NiFiDataPacketBuilder<String>
+  {
+    @Override
+    public NiFiDataPacket createNiFiDataPacket(String s)
+    {
+      return new StandardNiFiDataPacket(s.getBytes(StandardCharsets.UTF_8), new HashMap<String, String>());
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/2cb84942/contrib/src/test/java/com/datatorrent/contrib/nifi/demo/TestNiFiInputApplication.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/com/datatorrent/contrib/nifi/demo/TestNiFiInputApplication.java b/contrib/src/test/java/com/datatorrent/contrib/nifi/demo/TestNiFiInputApplication.java
new file mode 100644
index 0000000..ebe8a0c
--- /dev/null
+++ b/contrib/src/test/java/com/datatorrent/contrib/nifi/demo/TestNiFiInputApplication.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.contrib.nifi.demo;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.nifi.remote.client.SiteToSiteClient;
+import org.apache.nifi.remote.client.SiteToSiteClientConfig;
+
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.contrib.nifi.NiFiSinglePortInputOperator;
+import com.datatorrent.lib.io.ConsoleOutputOperator;
+import com.datatorrent.lib.util.WindowDataManager;
+
+/**
+ * A sample application that shows how to receive data to a NiFi Output Port.
+ */
+public class TestNiFiInputApplication implements StreamingApplication
+{
+
+  @Override
+  public void populateDAG(DAG dag, Configuration conf)
+  {
+    final SiteToSiteClientConfig clientConfig = new SiteToSiteClient.Builder()
+        .url("http://localhost:8080/nifi")
+        .portName("Apex")
+        .requestBatchCount(5)
+        .buildConfig();
+
+    final SiteToSiteClient.Builder builder = new SiteToSiteClient.Builder().fromConfig(clientConfig);
+
+    final WindowDataManager windowDataManager = new WindowDataManager.NoopWindowDataManager();
+
+    NiFiSinglePortInputOperator nifi = dag.addOperator("nifi", new NiFiSinglePortInputOperator(builder, windowDataManager));
+    ConsoleOutputOperator console = dag.addOperator("console", new ConsoleOutputOperator());
+    dag.addStream("nifi_console", nifi.outputPort, console.input).setLocality(null);
+  }
+
+  public static void main(String[] args) throws Exception
+  {
+    StreamingApplication app = new TestNiFiInputApplication();
+    LocalMode.runApp(app, new Configuration(false), 10000);
+    Thread.sleep(2000);
+    System.exit(0);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/2cb84942/contrib/src/test/java/com/datatorrent/contrib/nifi/demo/TestNiFiOutputApplication.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/com/datatorrent/contrib/nifi/demo/TestNiFiOutputApplication.java b/contrib/src/test/java/com/datatorrent/contrib/nifi/demo/TestNiFiOutputApplication.java
new file mode 100644
index 0000000..f5399e7
--- /dev/null
+++ b/contrib/src/test/java/com/datatorrent/contrib/nifi/demo/TestNiFiOutputApplication.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.contrib.nifi.demo;
+
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.nifi.remote.client.SiteToSiteClient;
+import org.apache.nifi.remote.client.SiteToSiteClientConfig;
+
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.contrib.nifi.NiFiDataPacket;
+import com.datatorrent.contrib.nifi.NiFiDataPacketBuilder;
+import com.datatorrent.contrib.nifi.NiFiSinglePortOutputOperator;
+import com.datatorrent.contrib.nifi.StandardNiFiDataPacket;
+import com.datatorrent.lib.testbench.RandomEventGenerator;
+import com.datatorrent.lib.util.WindowDataManager;
+
+/**
+ * A sample application that shows how to send data to a NiFi Input Port.
+ */
+public class TestNiFiOutputApplication implements StreamingApplication
+{
+
+  /**
+   * A builder that can create a NiFiDataPacket from a string.
+   */
+  public static class StringNiFiDataPacketBuilder implements NiFiDataPacketBuilder<String>
+  {
+    @Override
+    public NiFiDataPacket createNiFiDataPacket(String s)
+    {
+      return new StandardNiFiDataPacket(s.getBytes(StandardCharsets.UTF_8), new HashMap<String, String>());
+    }
+  }
+
+  @Override
+  public void populateDAG(DAG dag, Configuration conf)
+  {
+    final SiteToSiteClientConfig clientConfig = new SiteToSiteClient.Builder()
+        .url("http://localhost:8080/nifi")
+        .portName("Apex")
+        .buildConfig();
+
+    final int batchSize = 1;
+    final SiteToSiteClient.Builder builder = new SiteToSiteClient.Builder().fromConfig(clientConfig);
+    final NiFiDataPacketBuilder<String> dataPacketBuilder = new StringNiFiDataPacketBuilder();
+    final WindowDataManager windowDataManager = new WindowDataManager.NoopWindowDataManager();
+
+    RandomEventGenerator rand = dag.addOperator("rand", new RandomEventGenerator());
+
+    NiFiSinglePortOutputOperator nifi = dag.addOperator("nifi",
+        new NiFiSinglePortOutputOperator(builder, dataPacketBuilder, windowDataManager ,batchSize));
+
+    dag.addStream("rand_nifi", rand.string_data, nifi.inputPort).setLocality(null);
+  }
+
+  public static void main(String[] args) throws Exception
+  {
+    StreamingApplication app = new TestNiFiOutputApplication();
+    LocalMode.runApp(app, new Configuration(false), 10000);
+    Thread.sleep(2000);
+    System.exit(0);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/2cb84942/contrib/src/test/java/com/datatorrent/contrib/nifi/mock/MockDataPacket.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/com/datatorrent/contrib/nifi/mock/MockDataPacket.java b/contrib/src/test/java/com/datatorrent/contrib/nifi/mock/MockDataPacket.java
new file mode 100644
index 0000000..a57a771
--- /dev/null
+++ b/contrib/src/test/java/com/datatorrent/contrib/nifi/mock/MockDataPacket.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.contrib.nifi.mock;
+
+import java.io.InputStream;
+import java.util.Map;
+
+import org.apache.nifi.remote.protocol.DataPacket;
+
+public class MockDataPacket implements DataPacket
+{
+
+  private final Map<String, String> attributes;
+  private final InputStream inputStream;
+  private final long size;
+
+  public MockDataPacket(Map<String, String> attributes, InputStream inputStream, long size)
+  {
+    this.attributes = attributes;
+    this.inputStream = inputStream;
+    this.size = size;
+  }
+
+  @Override
+  public Map<String, String> getAttributes()
+  {
+    return attributes;
+  }
+
+  @Override
+  public InputStream getData()
+  {
+    return inputStream;
+  }
+
+  @Override
+  public long getSize()
+  {
+    return size;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/2cb84942/contrib/src/test/java/com/datatorrent/contrib/nifi/mock/MockSiteToSiteClient.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/com/datatorrent/contrib/nifi/mock/MockSiteToSiteClient.java b/contrib/src/test/java/com/datatorrent/contrib/nifi/mock/MockSiteToSiteClient.java
new file mode 100644
index 0000000..ee8fc55
--- /dev/null
+++ b/contrib/src/test/java/com/datatorrent/contrib/nifi/mock/MockSiteToSiteClient.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.contrib.nifi.mock;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.nifi.remote.Transaction;
+import org.apache.nifi.remote.TransferDirection;
+import org.apache.nifi.remote.client.SiteToSiteClient;
+import org.apache.nifi.remote.client.SiteToSiteClientConfig;
+import org.apache.nifi.remote.protocol.DataPacket;
+
+public class MockSiteToSiteClient implements SiteToSiteClient
+{
+
+  private final SiteToSiteClientConfig config;
+  private final List<DataPacket> queuedDataPackets;
+  private final Iterator<DataPacket> iter;
+  private final List<MockTransaction> transactions;
+
+  public MockSiteToSiteClient(final MockSiteToSiteClient.Builder builder)
+  {
+    this.config = builder.buildConfig();
+    this.queuedDataPackets = (builder.queuedDataPackets == null ?
+        new ArrayList<DataPacket>() : builder.queuedDataPackets);
+    this.iter = queuedDataPackets.iterator();
+    this.transactions = new ArrayList<>();
+  }
+
+  @Override
+  public Transaction createTransaction(TransferDirection direction) throws IOException
+  {
+    MockTransaction transaction = new MockTransaction(iter);
+    transactions.add(transaction);
+    return transaction;
+  }
+
+  @Override
+  public boolean isSecure() throws IOException
+  {
+    return false;
+  }
+
+  @Override
+  public SiteToSiteClientConfig getConfig()
+  {
+    return config;
+  }
+
+  @Override
+  public void close() throws IOException
+  {
+    // nothing to do
+  }
+
+  public List<MockTransaction> getMockTransactions()
+  {
+    return Collections.unmodifiableList(transactions);
+  }
+
+  /**
+   * A builder for MockSiteToSiteClients.
+   */
+  public static class Builder extends SiteToSiteClient.Builder
+  {
+
+    private List<DataPacket> queuedDataPackets;
+
+    public MockSiteToSiteClient.Builder queue(List<DataPacket> queuedDataPackets)
+    {
+      this.queuedDataPackets = new ArrayList<>(queuedDataPackets);
+      return this;
+    }
+
+    @Override
+    public SiteToSiteClient build()
+    {
+      return new MockSiteToSiteClient(this);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/2cb84942/contrib/src/test/java/com/datatorrent/contrib/nifi/mock/MockTransaction.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/com/datatorrent/contrib/nifi/mock/MockTransaction.java b/contrib/src/test/java/com/datatorrent/contrib/nifi/mock/MockTransaction.java
new file mode 100644
index 0000000..373f62b
--- /dev/null
+++ b/contrib/src/test/java/com/datatorrent/contrib/nifi/mock/MockTransaction.java
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.contrib.nifi.mock;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.remote.Communicant;
+import org.apache.nifi.remote.Transaction;
+import org.apache.nifi.remote.TransactionCompletion;
+import org.apache.nifi.remote.protocol.DataPacket;
+import org.apache.nifi.stream.io.ByteArrayInputStream;
+
+/**
+ * A mock Transaction that will return the data passed in the constructor on calls to receive(),
+ * and will store the data packets passed to send().
+ */
+public class MockTransaction implements Transaction
+{
+
+  private Iterator<DataPacket> dataPacketIter;
+
+  private List<DataPacket> sentDataPackets = new ArrayList<>();
+
+  public MockTransaction(Iterator<DataPacket> iter)
+  {
+    this.dataPacketIter = iter;
+  }
+
+  @Override
+  public void send(DataPacket dataPacket) throws IOException
+  {
+    if (dataPacket != null) {
+      this.sentDataPackets.add(dataPacket);
+    }
+  }
+
+  @Override
+  public void send(byte[] content, Map<String, String> attributes) throws IOException
+  {
+    this.sentDataPackets.add(new MockDataPacket(attributes, new ByteArrayInputStream(content), content.length));
+  }
+
+  public List<DataPacket> getSentDataPackets()
+  {
+    return Collections.unmodifiableList(sentDataPackets);
+  }
+
+  @Override
+  public DataPacket receive() throws IOException
+  {
+    if (dataPacketIter != null && dataPacketIter.hasNext()) {
+      return dataPacketIter.next();
+    } else {
+      return null;
+    }
+  }
+
+  @Override
+  public void confirm() throws IOException
+  {
+
+  }
+
+  @Override
+  public TransactionCompletion complete() throws IOException
+  {
+    return new TransactionCompletion()
+    {
+      @Override
+      public boolean isBackoff()
+      {
+        return false;
+      }
+
+      @Override
+      public int getDataPacketsTransferred()
+      {
+        return 0;
+      }
+
+      @Override
+      public long getBytesTransferred()
+      {
+        return 0;
+      }
+
+      @Override
+      public long getDuration(TimeUnit timeUnit)
+      {
+        return 0;
+      }
+    };
+  }
+
+  @Override
+  public void cancel(String explanation) throws IOException
+  {
+
+  }
+
+  @Override
+  public void error()
+  {
+
+  }
+
+  @Override
+  public TransactionState getState() throws IOException
+  {
+    return TransactionState.TRANSACTION_COMPLETED;
+  }
+
+  @Override
+  public Communicant getCommunicant()
+  {
+    return new Communicant()
+    {
+      @Override
+      public String getUrl()
+      {
+        return null;
+      }
+
+      @Override
+      public String getHost()
+      {
+        return null;
+      }
+
+      @Override
+      public int getPort()
+      {
+        return 0;
+      }
+
+      @Override
+      public String getDistinguishedName()
+      {
+        return null;
+      }
+    };
+  }
+
+}


[2/2] incubator-apex-malhar git commit: Merge branch 'MLHR-1936.nifi-connector' of https://github.com/bbende/incubator-apex-malhar into devel-3

Posted by th...@apache.org.
Merge branch 'MLHR-1936.nifi-connector' of https://github.com/bbende/incubator-apex-malhar into devel-3


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/98495ab6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/98495ab6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/98495ab6

Branch: refs/heads/devel-3
Commit: 98495ab62be21cc6a5ebcff32a27dd9a02e5ea23
Parents: a44e81a 2cb8494
Author: Thomas Weise <th...@datatorrent.com>
Authored: Sat Jan 30 11:29:32 2016 -0800
Committer: Thomas Weise <th...@datatorrent.com>
Committed: Sat Jan 30 11:29:32 2016 -0800

----------------------------------------------------------------------
 contrib/pom.xml                                 |   6 +
 .../contrib/nifi/AbstractNiFiInputOperator.java | 212 +++++++++++++++++
 .../nifi/AbstractNiFiOutputOperator.java        | 187 +++++++++++++++
 .../AbstractNiFiSinglePortInputOperator.java    |  83 +++++++
 .../contrib/nifi/NiFiDataPacket.java            |  42 ++++
 .../contrib/nifi/NiFiDataPacketBuilder.java     |  33 +++
 .../nifi/NiFiSinglePortInputOperator.java       |  72 ++++++
 .../nifi/NiFiSinglePortOutputOperator.java      | 126 ++++++++++
 .../contrib/nifi/StandardNiFiDataPacket.java    |  59 +++++
 .../nifi/NiFiSinglePortInputOperatorTest.java   | 200 ++++++++++++++++
 .../nifi/NiFiSinglePortOutputOperatorTest.java  | 231 +++++++++++++++++++
 .../nifi/demo/TestNiFiInputApplication.java     |  63 +++++
 .../nifi/demo/TestNiFiOutputApplication.java    |  85 +++++++
 .../contrib/nifi/mock/MockDataPacket.java       |  58 +++++
 .../contrib/nifi/mock/MockSiteToSiteClient.java | 102 ++++++++
 .../contrib/nifi/mock/MockTransaction.java      | 166 +++++++++++++
 16 files changed, 1725 insertions(+)
----------------------------------------------------------------------