You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2016/01/30 20:31:18 UTC
[1/2] incubator-apex-malhar git commit: MLHR-1936 Adding NiFi
operators to contrib
Repository: incubator-apex-malhar
Updated Branches:
refs/heads/devel-3 a44e81a83 -> 98495ab62
MLHR-1936 Adding NiFi operators to contrib
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/2cb84942
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/2cb84942
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/2cb84942
Branch: refs/heads/devel-3
Commit: 2cb849422560e53973decc1e72285ac3bda357f9
Parents: 9e77ef7
Author: Bryan Bende <bb...@apache.org>
Authored: Mon Dec 14 17:34:59 2015 -0500
Committer: Bryan Bende <bb...@apache.org>
Committed: Tue Jan 26 16:04:32 2016 -0500
----------------------------------------------------------------------
contrib/pom.xml | 6 +
.../contrib/nifi/AbstractNiFiInputOperator.java | 212 +++++++++++++++++
.../nifi/AbstractNiFiOutputOperator.java | 187 +++++++++++++++
.../AbstractNiFiSinglePortInputOperator.java | 83 +++++++
.../contrib/nifi/NiFiDataPacket.java | 42 ++++
.../contrib/nifi/NiFiDataPacketBuilder.java | 33 +++
.../nifi/NiFiSinglePortInputOperator.java | 72 ++++++
.../nifi/NiFiSinglePortOutputOperator.java | 126 ++++++++++
.../contrib/nifi/StandardNiFiDataPacket.java | 59 +++++
.../nifi/NiFiSinglePortInputOperatorTest.java | 200 ++++++++++++++++
.../nifi/NiFiSinglePortOutputOperatorTest.java | 231 +++++++++++++++++++
.../nifi/demo/TestNiFiInputApplication.java | 63 +++++
.../nifi/demo/TestNiFiOutputApplication.java | 85 +++++++
.../contrib/nifi/mock/MockDataPacket.java | 58 +++++
.../contrib/nifi/mock/MockSiteToSiteClient.java | 102 ++++++++
.../contrib/nifi/mock/MockTransaction.java | 166 +++++++++++++
16 files changed, 1725 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/2cb84942/contrib/pom.xml
----------------------------------------------------------------------
diff --git a/contrib/pom.xml b/contrib/pom.xml
index 16e28c8..17b6008 100755
--- a/contrib/pom.xml
+++ b/contrib/pom.xml
@@ -220,6 +220,12 @@
<dependencies>
<dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-site-to-site-client</artifactId>
+ <version>0.4.1</version>
+ <optional>true</optional>
+ </dependency>
+ <dependency>
<groupId>org.codehaus.janino</groupId>
<artifactId>janino</artifactId>
<version>2.7.8</version>
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/2cb84942/contrib/src/main/java/com/datatorrent/contrib/nifi/AbstractNiFiInputOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/nifi/AbstractNiFiInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/nifi/AbstractNiFiInputOperator.java
new file mode 100644
index 0000000..d0130f6
--- /dev/null
+++ b/contrib/src/main/java/com/datatorrent/contrib/nifi/AbstractNiFiInputOperator.java
@@ -0,0 +1,212 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.contrib.nifi;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.nifi.remote.Transaction;
+import org.apache.nifi.remote.TransferDirection;
+import org.apache.nifi.remote.client.SiteToSiteClient;
+import org.apache.nifi.remote.protocol.DataPacket;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.lib.util.WindowDataManager;
+import com.datatorrent.netlet.util.DTThrowable;
+
+/**
+ * This is the base implementation of a NiFi input operator.
+ * Subclasses should implement the methods which convert NiFi DataPackets to tuples and emit them.
+ * <p>
+ * Ports:<br>
+ * <b>Input</b>: No input port<br>
+ * <b>Output</b>: Can have any number of output ports<br>
+ * <br>
+ * Properties:<br>
+ * None<br>
+ * <br>
+ * Compile time checks:<br>
+ * Classes derived from this have to implement the abstract methods emitTuples(List<T> tuples)
+ * and createTuple(DataPacket dp)<br>
+ * <br>
+ * Run time checks:<br>
+ * None<br>
+ * <br>
+ * Benchmarks:<br>
+ * TBD<br>
+ * </p>
+ *
+ * @displayName Abstract NiFi Input
+ * @category Messaging
+ * @tags input operator
+ */
+
+public abstract class AbstractNiFiInputOperator<T> implements InputOperator
+{
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(AbstractNiFiInputOperator.class);
+
+ private transient SiteToSiteClient client;
+ private final SiteToSiteClient.Builder siteToSiteBuilder;
+
+ private transient int operatorContextId;
+ private transient long currentWindowId;
+ private transient List<T> currentWindowTuples;
+ private transient List<T> recoveredTuples;
+ private final WindowDataManager windowDataManager;
+
+ /**
+ * @param siteToSiteBuilder the builder for a NiFi SiteToSiteClient
+ * @param windowDataManager a WindowDataManager to save and load state for windows of tuples
+ */
+ public AbstractNiFiInputOperator(final SiteToSiteClient.Builder siteToSiteBuilder,
+ final WindowDataManager windowDataManager)
+ {
+ this.siteToSiteBuilder = siteToSiteBuilder;
+ this.windowDataManager = windowDataManager;
+ }
+
+ @Override
+ public void setup(Context.OperatorContext context)
+ {
+ this.client = siteToSiteBuilder.build();
+ this.operatorContextId = context.getId();
+ this.currentWindowTuples = new ArrayList<>();
+ this.recoveredTuples = new ArrayList<>();
+ this.windowDataManager.setup(context);
+ }
+
+ @Override
+ public void beginWindow(long windowId)
+ {
+ currentWindowId = windowId;
+
+ // if the current window is now less than the largest window, then we need to replay data
+ if (currentWindowId <= windowDataManager.getLargestRecoveryWindow()) {
+ try {
+ List<T> recoveredData = (List<T>)this.windowDataManager.load(operatorContextId, windowId);
+ if (recoveredData == null) {
+ return;
+ }
+
+ // if we recovered tuples then load them to be processed by next call to emitTuples()
+ recoveredTuples.addAll(recoveredData);
+ } catch (IOException e) {
+ DTThrowable.rethrow(e);
+ }
+ }
+ }
+
+ @Override
+ public void emitTuples()
+ {
+ // if we have recovered tuples we must be replaying a previous window so emit them,
+ // clear the recovered list, and return until we have no more recovered data
+ if (recoveredTuples.size() > 0) {
+ emitTuples(recoveredTuples);
+ recoveredTuples.clear();
+ return;
+ }
+
+ // no recovered data so start a transaction and pull new data
+ try {
+ final Transaction transaction = client.createTransaction(TransferDirection.RECEIVE);
+ if (transaction == null) {
+ LOGGER.warn("A transaction could not be created, returning...");
+ return;
+ }
+
+ DataPacket dataPacket = transaction.receive();
+ if (dataPacket == null) {
+ transaction.confirm();
+ transaction.complete();
+ LOGGER.debug("No data available to pull, returning and will try again...");
+ return;
+ }
+
+ // read all of the available data packets and convert to the given type
+ final List<T> tuples = new ArrayList<>();
+ do {
+ tuples.add(createTuple(dataPacket));
+ dataPacket = transaction.receive();
+ } while (dataPacket != null);
+
+ // confirm all of the expected data was received by comparing check-sums, does not complete the transaction
+ transaction.confirm();
+
+ // ensure we have the data saved before proceeding in case anything goes wrong
+ currentWindowTuples.addAll(tuples);
+ windowDataManager.save(currentWindowTuples, operatorContextId, currentWindowId);
+
+ // we now have the data saved so we can complete the transaction
+ transaction.complete();
+
+ // delegate to sub-classes to emit the tuples
+ emitTuples(tuples);
+
+ } catch (IOException e) {
+ DTThrowable.rethrow(e);
+ }
+ }
+
+ /**
+ * Provides mechanism for converting a DataPacket to the given type.
+ *
+ * @param dataPacket a DataPacket from the NiFi Site-To-Site client.
+ * @return the given type of tuple
+ */
+ protected abstract T createTuple(final DataPacket dataPacket) throws IOException;
+
+ /**
+ * Provided mechanism to emit the list of tuples for follow-on processing.
+ *
+ * @param tuples a list of tuples received from NiFi.
+ */
+ protected abstract void emitTuples(final List<T> tuples);
+
+ @Override
+ public void endWindow()
+ {
+ // save the final state of the window and clear the current window list
+ try {
+ windowDataManager.save(currentWindowTuples, operatorContextId, currentWindowId);
+ } catch (IOException e) {
+ DTThrowable.rethrow(e);
+ }
+ currentWindowTuples.clear();
+ }
+
+ @Override
+ public void teardown()
+ {
+ LOGGER.debug("Tearing down operator...");
+ windowDataManager.teardown();
+ try {
+ client.close();
+ } catch (IOException e) {
+ throw new RuntimeException("Error closing SiteToSiteClient", e);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/2cb84942/contrib/src/main/java/com/datatorrent/contrib/nifi/AbstractNiFiOutputOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/nifi/AbstractNiFiOutputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/nifi/AbstractNiFiOutputOperator.java
new file mode 100644
index 0000000..e4e61fb
--- /dev/null
+++ b/contrib/src/main/java/com/datatorrent/contrib/nifi/AbstractNiFiOutputOperator.java
@@ -0,0 +1,187 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.contrib.nifi;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.nifi.remote.Transaction;
+import org.apache.nifi.remote.TransferDirection;
+import org.apache.nifi.remote.client.SiteToSiteClient;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.lib.util.WindowDataManager;
+import com.datatorrent.netlet.util.DTThrowable;
+
+/**
+ * This is the base implementation of a NiFi output operator.
+ * A concrete operator should be created from this skeleton implementation.
+ * <p>
+ * <br>
+ * Ports:<br>
+ * <b>Input</b>: Can have any number of input ports<br>
+ * <b>Output</b>: no output port<br>
+ * <br>
+ * Properties:<br>
+ * None<br>
+ * <br>
+ * Compile time checks:<br>
+ * None<br>
+ * <br>
+ * Run time checks:<br>
+ * None<br>
+ * <br>
+ * Benchmarks:<br>
+ * TBD<br>
+ * <br>
+ * </p>
+ *
+ * @displayName Abstract NiFi Output
+ * @category Messaging
+ * @tags output operator
+ *
+ */
+public abstract class AbstractNiFiOutputOperator<T> extends BaseOperator
+{
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(AbstractNiFiOutputOperator.class);
+
+ protected final SiteToSiteClient.Builder siteToSiteBuilder;
+ protected final NiFiDataPacketBuilder<T> dataPacketBuilder;
+ protected final WindowDataManager windowDataManager;
+
+ protected transient SiteToSiteClient client;
+
+ private transient int operatorContextId;
+ private transient long currentWindowId;
+ private transient long largestRecoveryWindowId;
+ protected transient boolean skipProcessingTuple = false;
+
+ /**
+ * @param siteToSiteBuilder the builder for a NiFi SiteToSiteClient
+ * @param dataPacketBuilder a builder to produce NiFiDataPackets from incoming data
+ * @param windowDataManager a WindowDataManager to save and load state for windows of tuples
+ */
+ public AbstractNiFiOutputOperator(final SiteToSiteClient.Builder siteToSiteBuilder,
+ final NiFiDataPacketBuilder<T> dataPacketBuilder, final WindowDataManager windowDataManager)
+ {
+ this.siteToSiteBuilder = siteToSiteBuilder;
+ this.dataPacketBuilder = dataPacketBuilder;
+ this.windowDataManager = windowDataManager;
+ }
+
+ @Override
+ public void setup(Context.OperatorContext context)
+ {
+ this.client = siteToSiteBuilder.build();
+ this.operatorContextId = context.getId();
+ this.windowDataManager.setup(context);
+ }
+
+ @Override
+ public void beginWindow(long windowId)
+ {
+ currentWindowId = windowId;
+ largestRecoveryWindowId = windowDataManager.getLargestRecoveryWindow();
+
+ // if processing a window we've already seen, don't resend the tuples
+ if (currentWindowId <= largestRecoveryWindowId) {
+ skipProcessingTuple = true;
+ } else {
+ skipProcessingTuple = false;
+ }
+ }
+
+ @Override
+ public void endWindow()
+ {
+ // if replaying then nothing to do
+ if (currentWindowId <= largestRecoveryWindowId) {
+ return;
+ }
+
+ // if processing a new window then give sub-classes a chance to take action
+ endNewWindow();
+
+ // mark that we processed the window
+ try {
+ windowDataManager.save("processedWindow", operatorContextId, currentWindowId);
+ } catch (IOException e) {
+ DTThrowable.rethrow(e);
+ }
+ }
+
+ /**
+ * Called in endWindow() to give sub-classes a chance to take action when processing a new window.
+ *
+ * If the current window is <= the largest recovery window, this method will never be called.
+ */
+ protected abstract void endNewWindow();
+
+
+ @Override
+ public void teardown()
+ {
+ LOGGER.debug("Tearing down operator...");
+ windowDataManager.teardown();
+ try {
+ client.close();
+ } catch (IOException e) {
+ DTThrowable.rethrow(e);
+ }
+ }
+
+ /**
+ * Send the given batch of tuples to NiFi in a transaction, using the provided builder to
+ * first convert each tuple into a NiFiDataPacket.
+ *
+ * @param tuples a list of tuples to process
+ */
+ protected void processTuples(List<T> tuples)
+ {
+ if (tuples == null || tuples.size() == 0) {
+ return;
+ }
+
+ // create a transaction and send the data packets
+ try {
+ final Transaction transaction = client.createTransaction(TransferDirection.SEND);
+ if (transaction == null) {
+ throw new IllegalStateException("Unable to create a NiFi Transaction to send data");
+ }
+
+ // convert each tuple to a NiFiDataPacket using the provided builder
+ for (T tuple : tuples) {
+ NiFiDataPacket dp = dataPacketBuilder.createNiFiDataPacket(tuple);
+ transaction.send(dp.getContent(), dp.getAttributes());
+ }
+
+ transaction.confirm();
+ transaction.complete();
+ } catch (IOException ioe) {
+ DTThrowable.rethrow(ioe);
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/2cb84942/contrib/src/main/java/com/datatorrent/contrib/nifi/AbstractNiFiSinglePortInputOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/nifi/AbstractNiFiSinglePortInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/nifi/AbstractNiFiSinglePortInputOperator.java
new file mode 100644
index 0000000..d874be0
--- /dev/null
+++ b/contrib/src/main/java/com/datatorrent/contrib/nifi/AbstractNiFiSinglePortInputOperator.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.contrib.nifi;
+
+import java.util.List;
+
+import org.apache.nifi.remote.client.SiteToSiteClient;
+
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.lib.util.WindowDataManager;
+
+/**
+ * This is the base implementation of a NiFi input operator with a single output port.
+ * Subclasses should implement the methods which convert NiFi DataPackets to tuples.
+ * <p>
+ * <br>
+ * Ports:<br>
+ * <b>Input</b>: No input port<br>
+ * <b>Output</b>: Have only one output port<br>
+ * <br>
+ * Properties:<br>
+ * None<br>
+ * <br>
+ * Compile time checks:<br>
+ * Class derived from this has to implement the abstract method createTuple(DataPacket dp) <br>
+ * <br>
+ * Run time checks:<br>
+ * None<br>
+ * <br>
+ * Benchmarks:<br>
+ * TBD<br>
+ * <br>
+ * </p>
+ *
+ * @displayName Abstract NiFi Single Port Input
+ * @category Messaging
+ * @tags input operator
+ *
+ */
+public abstract class AbstractNiFiSinglePortInputOperator<T> extends AbstractNiFiInputOperator<T>
+{
+
+ /**
+ * This is the output port on which tuples extracted from NiFi data packets are emitted.
+ */
+ public final transient DefaultOutputPort<T> outputPort = new DefaultOutputPort<>();
+
+ /**
+ *
+ * @param siteToSiteBuilder the builder for a NiFi SiteToSiteClient
+ * @param windowDataManager a WindowDataManager to save and load state for windows of tuples
+ */
+ public AbstractNiFiSinglePortInputOperator(final SiteToSiteClient.Builder siteToSiteBuilder,
+ final WindowDataManager windowDataManager)
+ {
+ super(siteToSiteBuilder, windowDataManager);
+ }
+
+ @Override
+ protected void emitTuples(final List<T> tuples)
+ {
+ for (T tuple : tuples) {
+ outputPort.emit(tuple);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/2cb84942/contrib/src/main/java/com/datatorrent/contrib/nifi/NiFiDataPacket.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/nifi/NiFiDataPacket.java b/contrib/src/main/java/com/datatorrent/contrib/nifi/NiFiDataPacket.java
new file mode 100644
index 0000000..9c66056
--- /dev/null
+++ b/contrib/src/main/java/com/datatorrent/contrib/nifi/NiFiDataPacket.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.contrib.nifi;
+
+import java.util.Map;
+
+/**
+ * <p>
+ * The NiFiDataPacket provides a packaging around a NiFi FlowFile. It wraps both
+ * a FlowFile's content and its attributes so that they can be processed by Apex.
+ * </p>
+ */
+public interface NiFiDataPacket
+{
+
+ /**
+ * @return the contents of a NiFi FlowFile
+ */
+ byte[] getContent();
+
+ /**
+ * @return a Map of attributes that are associated with the NiFi FlowFile
+ */
+ Map<String, String> getAttributes();
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/2cb84942/contrib/src/main/java/com/datatorrent/contrib/nifi/NiFiDataPacketBuilder.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/nifi/NiFiDataPacketBuilder.java b/contrib/src/main/java/com/datatorrent/contrib/nifi/NiFiDataPacketBuilder.java
new file mode 100644
index 0000000..4b71792
--- /dev/null
+++ b/contrib/src/main/java/com/datatorrent/contrib/nifi/NiFiDataPacketBuilder.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.contrib.nifi;
+
+import java.io.Serializable;
+
+/**
+ * Creates a NiFiDataPacket from an incoming instance of the given type.
+ *
+ * @param <T> the type that a NiFiDataPacket is being created from
+ */
+public interface NiFiDataPacketBuilder<T> extends Serializable
+{
+
+ NiFiDataPacket createNiFiDataPacket(T t);
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/2cb84942/contrib/src/main/java/com/datatorrent/contrib/nifi/NiFiSinglePortInputOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/nifi/NiFiSinglePortInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/nifi/NiFiSinglePortInputOperator.java
new file mode 100644
index 0000000..f80386d
--- /dev/null
+++ b/contrib/src/main/java/com/datatorrent/contrib/nifi/NiFiSinglePortInputOperator.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.contrib.nifi;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Map;
+
+import org.apache.nifi.remote.client.SiteToSiteClient;
+import org.apache.nifi.remote.protocol.DataPacket;
+import org.apache.nifi.stream.io.StreamUtils;
+
+import com.datatorrent.lib.util.WindowDataManager;
+
+/**
+ * Input adapter operator which consumes data from NiFi and produces NiFiDataPackets
+ * where each NiFiDataPacket contains a byte array of content and a Map of attributes.
+ *
+ * @displayName NiFi Input Operator
+ * @category Messaging
+ * @tags input operator
+ *
+ */
+public class NiFiSinglePortInputOperator extends AbstractNiFiSinglePortInputOperator<NiFiDataPacket>
+{
+
+ // required by Kyro serialization
+ private NiFiSinglePortInputOperator()
+ {
+ super(null, null);
+ }
+
+ /**
+ *
+ * @param siteToSiteBuilder the builder for a NiFi SiteToSiteClient
+ * @param windowDataManager a WindowDataManager to save and load state for windows of tuples
+ */
+ public NiFiSinglePortInputOperator(final SiteToSiteClient.Builder siteToSiteBuilder,
+ final WindowDataManager windowDataManager)
+ {
+ super(siteToSiteBuilder, windowDataManager);
+ }
+
+ @Override
+ protected NiFiDataPacket createTuple(final DataPacket dataPacket) throws IOException
+ {
+ // read the data into a byte array and wrap it with the attributes into a NiFiDataPacket
+ final InputStream inStream = dataPacket.getData();
+ final byte[] data = new byte[(int)dataPacket.getSize()];
+ StreamUtils.fillBuffer(inStream, data);
+
+ final Map<String, String> attributes = dataPacket.getAttributes();
+ return new StandardNiFiDataPacket(data, attributes);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/2cb84942/contrib/src/main/java/com/datatorrent/contrib/nifi/NiFiSinglePortOutputOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/nifi/NiFiSinglePortOutputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/nifi/NiFiSinglePortOutputOperator.java
new file mode 100644
index 0000000..2692034
--- /dev/null
+++ b/contrib/src/main/java/com/datatorrent/contrib/nifi/NiFiSinglePortOutputOperator.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.contrib.nifi;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.nifi.remote.client.SiteToSiteClient;
+
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.lib.util.WindowDataManager;
+
+/**
+ * NiFi output adapter operator with a single input port. Clients should provide a NiFiDataPacketBuilder implementation
+ * for converting incoming tuples to NiFiDataPackets.
+ * <p>
+ * <br>
+ * Ports:<br>
+ * <b>Input</b>: Have only one input port<br>
+ * <b>Output</b>: No output port<br>
+ * <br>
+ * Properties:<br>
+ * None<br>
+ * <br>
+ * Compile time checks:<br>
+ * None<br>
+ * <br>
+ * Run time checks:<br>
+ * None<br>
+ * <br>
+ * Benchmarks:<br>
+ * TBD<br>
+ * <br>
+ * </p>
+ *
+ * @displayName NiFi Single Port Output
+ * @category Messaging
+ * @tags output operator
+ *
+ */
+public class NiFiSinglePortOutputOperator<T> extends AbstractNiFiOutputOperator<T>
+{
+
+ public final transient BufferingInputPort inputPort;
+
+ // required by Kyro serialization
+ private NiFiSinglePortOutputOperator()
+ {
+ this(null, null, null, 0);
+ }
+
+ /**
+ * @param siteToSiteBuilder the builder for a NiFi SiteToSiteClient
+ * @param dataPacketBuilder a builder to produce NiFiDataPackets from incoming data
+ * @param windowDataManager a WindowDataManager to save and load state for windows of tuples
+ * @param batchSize the maximum number of tuples to send to NiFi in a single transaction
+ */
+ public NiFiSinglePortOutputOperator(
+ final SiteToSiteClient.Builder siteToSiteBuilder,
+ final NiFiDataPacketBuilder<T> dataPacketBuilder,
+ final WindowDataManager windowDataManager,
+ final int batchSize)
+ {
+ super(siteToSiteBuilder, dataPacketBuilder, windowDataManager);
+ this.inputPort = new BufferingInputPort(batchSize);
+ }
+
+ @Override
+ protected void endNewWindow()
+ {
+ // flush any tuples that may have been buffered between the last flush and endWindow()
+ inputPort.flush();
+ }
+
+ /**
+ * An InputPort that accumulates tuples up to the provided batch size before flushing.
+ */
+ public class BufferingInputPort extends DefaultInputPort<T>
+ {
+
+ private final int batchSize;
+ private final List<T> tuples;
+
+ public BufferingInputPort(final int batchSize)
+ {
+ this.tuples = new ArrayList<>();
+ this.batchSize = batchSize;
+ }
+
+ @Override
+ public void process(T tuple)
+ {
+ if (!skipProcessingTuple) {
+ tuples.add(tuple);
+
+ if (tuples.size() >= batchSize) {
+ flush();
+ }
+ }
+ }
+
+ public void flush()
+ {
+ processTuples(tuples);
+ tuples.clear();
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/2cb84942/contrib/src/main/java/com/datatorrent/contrib/nifi/StandardNiFiDataPacket.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/nifi/StandardNiFiDataPacket.java b/contrib/src/main/java/com/datatorrent/contrib/nifi/StandardNiFiDataPacket.java
new file mode 100644
index 0000000..6821adf
--- /dev/null
+++ b/contrib/src/main/java/com/datatorrent/contrib/nifi/StandardNiFiDataPacket.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.contrib.nifi;
+
+import java.io.Serializable;
+import java.util.Map;
+
+/**
+ * An implementation of NiFiDataPacket.
+ */
+public class StandardNiFiDataPacket implements NiFiDataPacket, Serializable
+{
+ private static final long serialVersionUID = 6364005260220243322L;
+
+ private final byte[] content;
+ private final Map<String, String> attributes;
+
+ // required by Kyro serialization
+ public StandardNiFiDataPacket()
+ {
+ this.content = null;
+ this.attributes = null;
+ }
+
+ public StandardNiFiDataPacket(final byte[] content, final Map<String, String> attributes)
+ {
+ this.content = content;
+ this.attributes = attributes;
+ }
+
+ @Override
+ public byte[] getContent()
+ {
+ return content;
+ }
+
+ @Override
+ public Map<String, String> getAttributes()
+ {
+ return attributes;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/2cb84942/contrib/src/test/java/com/datatorrent/contrib/nifi/NiFiSinglePortInputOperatorTest.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/com/datatorrent/contrib/nifi/NiFiSinglePortInputOperatorTest.java b/contrib/src/test/java/com/datatorrent/contrib/nifi/NiFiSinglePortInputOperatorTest.java
new file mode 100644
index 0000000..a25497b
--- /dev/null
+++ b/contrib/src/test/java/com/datatorrent/contrib/nifi/NiFiSinglePortInputOperatorTest.java
@@ -0,0 +1,200 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.contrib.nifi;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.jetbrains.annotations.NotNull;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.nifi.remote.client.SiteToSiteClient;
+import org.apache.nifi.remote.protocol.DataPacket;
+import org.apache.nifi.stream.io.ByteArrayInputStream;
+import org.apache.nifi.util.file.FileUtils;
+
+import com.datatorrent.api.Attribute;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.contrib.nifi.mock.MockDataPacket;
+import com.datatorrent.contrib.nifi.mock.MockSiteToSiteClient;
+import com.datatorrent.lib.helper.OperatorContextTestHelper;
+import com.datatorrent.lib.testbench.CollectorTestSink;
+import com.datatorrent.lib.util.WindowDataManager;
+
+public class NiFiSinglePortInputOperatorTest
+{
+
+ private MockSiteToSiteClient.Builder builder;
+ private CollectorTestSink<Object> sink;
+ private Context.OperatorContext context;
+ private WindowDataManager windowDataManager;
+ private NiFiSinglePortInputOperator operator;
+
+ @Before
+ public void setup() throws IOException
+ {
+ final String windowDataDir = "target/" + this.getClass().getSimpleName();
+ final File windowDataDirFile = new File(windowDataDir);
+ if (windowDataDirFile.exists()) {
+ FileUtils.deleteFile(windowDataDirFile, true);
+ }
+
+ Attribute.AttributeMap attributeMap = new Attribute.AttributeMap.DefaultAttributeMap();
+ attributeMap.put(DAG.APPLICATION_PATH, windowDataDir);
+
+ context = new OperatorContextTestHelper.TestIdOperatorContext(12345, attributeMap);
+
+ sink = new CollectorTestSink<>();
+ builder = new MockSiteToSiteClient.Builder();
+ windowDataManager = new WindowDataManager.FSWindowDataManager();
+
+ operator = new NiFiSinglePortInputOperator(builder, windowDataManager);
+ operator.outputPort.setSink(sink);
+ }
+
+ @After
+ public void teardown() {
+ if (operator != null) {
+ operator.teardown();
+ }
+ }
+
+ @Test
+ public void testSimpleInput() throws IOException
+ {
+ // create some mock packets and queue them in the builder before running the operator
+ final List<DataPacket> dataPackets = getDataPackets(4);
+ builder.queue(dataPackets);
+
+ operator.setup(context);
+ operator.beginWindow(1);
+ operator.emitTuples();
+ operator.endWindow();
+
+ Assert.assertEquals("Size of collected tuples should equal size of mock data packets",
+ dataPackets.size(), sink.collectedTuples.size());
+
+ operator.beginWindow(2);
+ operator.emitTuples();
+ operator.endWindow();
+
+ Assert.assertEquals("Size of collected tuples should still equal size of mock data packets",
+ dataPackets.size(), sink.collectedTuples.size());
+
+ // verify that the collector sink got all the expected content
+ List<String> expectedContents = Arrays.asList("content1", "content2", "content3", "content4");
+ verifyContents(expectedContents, sink.collectedTuples);
+
+ // reinitialize the data manager so it picks up the saved data
+ windowDataManager.setup(context);
+
+ // verify that all the data packets were saved for window #1
+ List<StandardNiFiDataPacket> windowData = (List<StandardNiFiDataPacket>) windowDataManager.load(context.getId(), 1);
+ Assert.assertNotNull("Should have recovered data", windowData);
+ Assert.assertEquals("Size of recovered data should equal size of mock data packets",
+ dataPackets.size(), windowData.size());
+ }
+
+ @Test
+ public void testRecoveryAndIdempotency()
+ {
+ // create some mock packets and queue them in the builder before running the operator
+ final List<DataPacket> dataPackets = getDataPackets(4);
+ builder.queue(dataPackets);
+
+ operator.setup(context);
+ operator.beginWindow(1);
+ operator.emitTuples();
+ operator.endWindow();
+
+ Assert.assertEquals("Size of collected tuples should equal size of mock data packets",
+ dataPackets.size(), sink.collectedTuples.size());
+
+ // simulate failure and then re-deployment of operator
+
+ sink.collectedTuples.clear();
+ Assert.assertEquals("Should not have collected tuples", 0, sink.collectedTuples.size());
+
+ operator.setup(context);
+ operator.beginWindow(1);
+ operator.emitTuples();
+ operator.endWindow();
+
+ Assert.assertEquals("Size of collected tuples should equal size of mock data packets",
+ dataPackets.size(), sink.collectedTuples.size());
+ }
+
+ @NotNull
+ private List<DataPacket> getDataPackets(int size)
+ {
+ List<DataPacket> dataPackets = new ArrayList<>();
+
+ for (int i=1; i <= size; i++) {
+ dataPackets.add(getDataPacket(String.valueOf(i)));
+ }
+ return dataPackets;
+ }
+
+ @NotNull
+ private DataPacket getDataPacket(final String id)
+ {
+ Map<String, String> attrs = new HashMap<>();
+ attrs.put("keyA", "valA");
+ attrs.put("keyB", "valB");
+ attrs.put("key" + id, "val" + id);
+
+ byte[] content = ("content" + id).getBytes(StandardCharsets.UTF_8);
+ ByteArrayInputStream in = new ByteArrayInputStream(content);
+
+ return new MockDataPacket(attrs, in, content.length);
+ }
+
+ private void verifyContents(List<String> expectedContents, List<Object> tuples)
+ {
+ for (String expectedContent : expectedContents) {
+ boolean found = false;
+
+ for (Object obj : tuples) {
+ if (obj instanceof NiFiDataPacket) {
+ NiFiDataPacket dp = (NiFiDataPacket)obj;
+ Assert.assertEquals(3, dp.getAttributes().size());
+
+ String content = new String(dp.getContent(), StandardCharsets.UTF_8);
+ if (content.equals(expectedContent)) {
+ found = true;
+ break;
+ }
+ }
+ }
+
+ Assert.assertTrue(found);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/2cb84942/contrib/src/test/java/com/datatorrent/contrib/nifi/NiFiSinglePortOutputOperatorTest.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/com/datatorrent/contrib/nifi/NiFiSinglePortOutputOperatorTest.java b/contrib/src/test/java/com/datatorrent/contrib/nifi/NiFiSinglePortOutputOperatorTest.java
new file mode 100644
index 0000000..5b58ae0
--- /dev/null
+++ b/contrib/src/test/java/com/datatorrent/contrib/nifi/NiFiSinglePortOutputOperatorTest.java
@@ -0,0 +1,231 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.contrib.nifi;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.nifi.remote.protocol.DataPacket;
+import org.apache.nifi.util.file.FileUtils;
+
+import com.datatorrent.api.Attribute;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.contrib.nifi.mock.MockSiteToSiteClient;
+import com.datatorrent.contrib.nifi.mock.MockTransaction;
+import com.datatorrent.lib.helper.OperatorContextTestHelper;
+import com.datatorrent.lib.testbench.CollectorTestSink;
+import com.datatorrent.lib.util.WindowDataManager;
+
+public class NiFiSinglePortOutputOperatorTest
+{
+
+ private Context.OperatorContext context;
+ private WindowDataManager windowDataManager;
+ private MockSiteToSiteClient.Builder stsBuilder;
+ private NiFiDataPacketBuilder<String> dpBuilder;
+ private NiFiSinglePortOutputOperator<String> operator;
+
+ @Before
+ public void setup() throws IOException
+ {
+ final String windowDataDir = "target/" + this.getClass().getSimpleName();
+ final File windowDataDirFile = new File(windowDataDir);
+ if (windowDataDirFile.exists()) {
+ FileUtils.deleteFile(windowDataDirFile, true);
+ }
+
+ Attribute.AttributeMap attributeMap = new Attribute.AttributeMap.DefaultAttributeMap();
+ attributeMap.put(DAG.APPLICATION_PATH, windowDataDir);
+
+ context = new OperatorContextTestHelper.TestIdOperatorContext(12345, attributeMap);
+
+ windowDataManager = new WindowDataManager.FSWindowDataManager();
+
+ stsBuilder = new MockSiteToSiteClient.Builder();
+ dpBuilder = new StringNiFiDataPacketBuilder();
+ operator = new NiFiSinglePortOutputOperator(stsBuilder, dpBuilder, windowDataManager, 1);
+ }
+
+ @Test
+ public void testTransactionPerTuple() throws IOException
+ {
+ operator.setup(context);
+
+ // get the mock client which will capture each transactions
+ final MockSiteToSiteClient mockClient = (MockSiteToSiteClient)operator.client;
+
+ final String tuple1 = "tuple1";
+ final String tuple2 = "tuple2";
+ final String tuple3 = "tuple3";
+
+ operator.beginWindow(1);
+
+ operator.inputPort.process(tuple1);
+ Assert.assertEquals(1, mockClient.getMockTransactions().size());
+
+ operator.inputPort.process(tuple2);
+ Assert.assertEquals(2, mockClient.getMockTransactions().size());
+
+ operator.inputPort.process(tuple3);
+ Assert.assertEquals(3, mockClient.getMockTransactions().size());
+
+ operator.endNewWindow();
+ Assert.assertEquals(3, mockClient.getMockTransactions().size());
+
+ // verify we sent the correct content
+ List<String> expectedContents = Arrays.asList(tuple1, tuple2, tuple3);
+ List<MockTransaction> transactions = mockClient.getMockTransactions();
+
+ verifyTransactions(expectedContents, transactions);
+ }
+
+ @Test
+ public void testBatchSize() throws IOException
+ {
+ final int batchSize = 3;
+ operator = new NiFiSinglePortOutputOperator(stsBuilder, dpBuilder, windowDataManager, batchSize);
+ operator.setup(context);
+
+ // get the mock client which will capture each transactions
+ final MockSiteToSiteClient mockClient = (MockSiteToSiteClient)operator.client;
+
+ final String tuple1 = "tuple1";
+ final String tuple2 = "tuple2";
+ final String tuple3 = "tuple3";
+ final String tuple4 = "tuple4";
+ final String tuple5 = "tuple5";
+
+ operator.beginWindow(1);
+
+ operator.inputPort.process(tuple1);
+ Assert.assertEquals(0, mockClient.getMockTransactions().size());
+
+ operator.inputPort.process(tuple2);
+ Assert.assertEquals(0, mockClient.getMockTransactions().size());
+
+ // should cause the port to flush and create a transaction
+ operator.inputPort.process(tuple3);
+ Assert.assertEquals(1, mockClient.getMockTransactions().size());
+
+ operator.inputPort.process(tuple4);
+ Assert.assertEquals(1, mockClient.getMockTransactions().size());
+
+ operator.inputPort.process(tuple5);
+ Assert.assertEquals(1, mockClient.getMockTransactions().size());
+
+ // should flush tuples 4 and 5 and cause a new transaction
+ operator.endNewWindow();
+ Assert.assertEquals(2, mockClient.getMockTransactions().size());
+
+ // verify we sent the correct content
+ List<String> expectedContents = Arrays.asList(tuple1, tuple2, tuple3, tuple4, tuple5);
+ List<MockTransaction> transactions = mockClient.getMockTransactions();
+
+ verifyTransactions(expectedContents, transactions);
+ }
+
+ @Test
+ public void testReplay() throws IOException
+ {
+ final String tuple1 = "tuple1";
+ final String tuple2 = "tuple2";
+ final String tuple3 = "tuple3";
+
+ operator.setup(context);
+ operator.beginWindow(1);
+ operator.inputPort.process(tuple1);
+ operator.inputPort.process(tuple2);
+ operator.inputPort.process(tuple3);
+ operator.endWindow();
+
+ // get the mock client which will capture each transactions
+ MockSiteToSiteClient mockClient = (MockSiteToSiteClient)operator.client;
+ Assert.assertEquals(3, mockClient.getMockTransactions().size());
+
+ // simulate replaying window #1
+ operator.setup(context);
+ operator.beginWindow(1);
+ operator.inputPort.process(tuple1);
+ operator.inputPort.process(tuple2);
+ operator.inputPort.process(tuple3);
+ operator.endWindow();
+
+ // should not have created any transactions on the new client
+ mockClient = (MockSiteToSiteClient)operator.client;
+ Assert.assertEquals(0, mockClient.getMockTransactions().size());
+ }
+
+
+ private void verifyTransactions(List<String> expectedContents, List<MockTransaction> transactions) throws IOException
+ {
+ // convert all the data packets in the transactions to strings
+ final List<String> dataPacketContents = new ArrayList<>();
+
+ for (MockTransaction mockTransaction : transactions)
+ {
+ List<DataPacket> dps = mockTransaction.getSentDataPackets();
+ Assert.assertTrue(dps.size() > 0);
+
+ for (DataPacket dp : dps)
+ {
+ final String dpContent = IOUtils.toString(dp.getData());
+ dataPacketContents.add(dpContent);
+ }
+ }
+
+ // verify each expected piece of content is found in the data packet contents
+ for (String expectedContent : expectedContents)
+ {
+ boolean found = false;
+ for (String dataPacket : dataPacketContents)
+ {
+ if (dataPacket.equals(expectedContent))
+ {
+ found = true;
+ break;
+ }
+ }
+ Assert.assertTrue(found);
+ }
+ }
+
+ /**
+ * A builder that can create a NiFiDataPacket from a string.
+ */
+ public static class StringNiFiDataPacketBuilder implements NiFiDataPacketBuilder<String>
+ {
+ @Override
+ public NiFiDataPacket createNiFiDataPacket(String s)
+ {
+ return new StandardNiFiDataPacket(s.getBytes(StandardCharsets.UTF_8), new HashMap<String, String>());
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/2cb84942/contrib/src/test/java/com/datatorrent/contrib/nifi/demo/TestNiFiInputApplication.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/com/datatorrent/contrib/nifi/demo/TestNiFiInputApplication.java b/contrib/src/test/java/com/datatorrent/contrib/nifi/demo/TestNiFiInputApplication.java
new file mode 100644
index 0000000..ebe8a0c
--- /dev/null
+++ b/contrib/src/test/java/com/datatorrent/contrib/nifi/demo/TestNiFiInputApplication.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.contrib.nifi.demo;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.nifi.remote.client.SiteToSiteClient;
+import org.apache.nifi.remote.client.SiteToSiteClientConfig;
+
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.contrib.nifi.NiFiSinglePortInputOperator;
+import com.datatorrent.lib.io.ConsoleOutputOperator;
+import com.datatorrent.lib.util.WindowDataManager;
+
+/**
+ * A sample application that shows how to receive data to a NiFi Output Port.
+ */
+public class TestNiFiInputApplication implements StreamingApplication
+{
+
+ @Override
+ public void populateDAG(DAG dag, Configuration conf)
+ {
+ final SiteToSiteClientConfig clientConfig = new SiteToSiteClient.Builder()
+ .url("http://localhost:8080/nifi")
+ .portName("Apex")
+ .requestBatchCount(5)
+ .buildConfig();
+
+ final SiteToSiteClient.Builder builder = new SiteToSiteClient.Builder().fromConfig(clientConfig);
+
+ final WindowDataManager windowDataManager = new WindowDataManager.NoopWindowDataManager();
+
+ NiFiSinglePortInputOperator nifi = dag.addOperator("nifi", new NiFiSinglePortInputOperator(builder, windowDataManager));
+ ConsoleOutputOperator console = dag.addOperator("console", new ConsoleOutputOperator());
+ dag.addStream("nifi_console", nifi.outputPort, console.input).setLocality(null);
+ }
+
+ public static void main(String[] args) throws Exception
+ {
+ StreamingApplication app = new TestNiFiInputApplication();
+ LocalMode.runApp(app, new Configuration(false), 10000);
+ Thread.sleep(2000);
+ System.exit(0);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/2cb84942/contrib/src/test/java/com/datatorrent/contrib/nifi/demo/TestNiFiOutputApplication.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/com/datatorrent/contrib/nifi/demo/TestNiFiOutputApplication.java b/contrib/src/test/java/com/datatorrent/contrib/nifi/demo/TestNiFiOutputApplication.java
new file mode 100644
index 0000000..f5399e7
--- /dev/null
+++ b/contrib/src/test/java/com/datatorrent/contrib/nifi/demo/TestNiFiOutputApplication.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.contrib.nifi.demo;
+
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.nifi.remote.client.SiteToSiteClient;
+import org.apache.nifi.remote.client.SiteToSiteClientConfig;
+
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.contrib.nifi.NiFiDataPacket;
+import com.datatorrent.contrib.nifi.NiFiDataPacketBuilder;
+import com.datatorrent.contrib.nifi.NiFiSinglePortOutputOperator;
+import com.datatorrent.contrib.nifi.StandardNiFiDataPacket;
+import com.datatorrent.lib.testbench.RandomEventGenerator;
+import com.datatorrent.lib.util.WindowDataManager;
+
+/**
+ * A sample application that shows how to send data to a NiFi Input Port.
+ */
+public class TestNiFiOutputApplication implements StreamingApplication
+{
+
+ /**
+ * A builder that can create a NiFiDataPacket from a string.
+ */
+ public static class StringNiFiDataPacketBuilder implements NiFiDataPacketBuilder<String>
+ {
+ @Override
+ public NiFiDataPacket createNiFiDataPacket(String s)
+ {
+ return new StandardNiFiDataPacket(s.getBytes(StandardCharsets.UTF_8), new HashMap<String, String>());
+ }
+ }
+
+ @Override
+ public void populateDAG(DAG dag, Configuration conf)
+ {
+ final SiteToSiteClientConfig clientConfig = new SiteToSiteClient.Builder()
+ .url("http://localhost:8080/nifi")
+ .portName("Apex")
+ .buildConfig();
+
+ final int batchSize = 1;
+ final SiteToSiteClient.Builder builder = new SiteToSiteClient.Builder().fromConfig(clientConfig);
+ final NiFiDataPacketBuilder<String> dataPacketBuilder = new StringNiFiDataPacketBuilder();
+ final WindowDataManager windowDataManager = new WindowDataManager.NoopWindowDataManager();
+
+ RandomEventGenerator rand = dag.addOperator("rand", new RandomEventGenerator());
+
+ NiFiSinglePortOutputOperator nifi = dag.addOperator("nifi",
+ new NiFiSinglePortOutputOperator(builder, dataPacketBuilder, windowDataManager ,batchSize));
+
+ dag.addStream("rand_nifi", rand.string_data, nifi.inputPort).setLocality(null);
+ }
+
+ public static void main(String[] args) throws Exception
+ {
+ StreamingApplication app = new TestNiFiOutputApplication();
+ LocalMode.runApp(app, new Configuration(false), 10000);
+ Thread.sleep(2000);
+ System.exit(0);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/2cb84942/contrib/src/test/java/com/datatorrent/contrib/nifi/mock/MockDataPacket.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/com/datatorrent/contrib/nifi/mock/MockDataPacket.java b/contrib/src/test/java/com/datatorrent/contrib/nifi/mock/MockDataPacket.java
new file mode 100644
index 0000000..a57a771
--- /dev/null
+++ b/contrib/src/test/java/com/datatorrent/contrib/nifi/mock/MockDataPacket.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.contrib.nifi.mock;
+
+import java.io.InputStream;
+import java.util.Map;
+
+import org.apache.nifi.remote.protocol.DataPacket;
+
+public class MockDataPacket implements DataPacket
+{
+
+ private final Map<String, String> attributes;
+ private final InputStream inputStream;
+ private final long size;
+
+ public MockDataPacket(Map<String, String> attributes, InputStream inputStream, long size)
+ {
+ this.attributes = attributes;
+ this.inputStream = inputStream;
+ this.size = size;
+ }
+
+ @Override
+ public Map<String, String> getAttributes()
+ {
+ return attributes;
+ }
+
+ @Override
+ public InputStream getData()
+ {
+ return inputStream;
+ }
+
+ @Override
+ public long getSize()
+ {
+ return size;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/2cb84942/contrib/src/test/java/com/datatorrent/contrib/nifi/mock/MockSiteToSiteClient.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/com/datatorrent/contrib/nifi/mock/MockSiteToSiteClient.java b/contrib/src/test/java/com/datatorrent/contrib/nifi/mock/MockSiteToSiteClient.java
new file mode 100644
index 0000000..ee8fc55
--- /dev/null
+++ b/contrib/src/test/java/com/datatorrent/contrib/nifi/mock/MockSiteToSiteClient.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.contrib.nifi.mock;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.nifi.remote.Transaction;
+import org.apache.nifi.remote.TransferDirection;
+import org.apache.nifi.remote.client.SiteToSiteClient;
+import org.apache.nifi.remote.client.SiteToSiteClientConfig;
+import org.apache.nifi.remote.protocol.DataPacket;
+
+public class MockSiteToSiteClient implements SiteToSiteClient
+{
+
+ private final SiteToSiteClientConfig config;
+ private final List<DataPacket> queuedDataPackets;
+ private final Iterator<DataPacket> iter;
+ private final List<MockTransaction> transactions;
+
+ public MockSiteToSiteClient(final MockSiteToSiteClient.Builder builder)
+ {
+ this.config = builder.buildConfig();
+ this.queuedDataPackets = (builder.queuedDataPackets == null ?
+ new ArrayList<DataPacket>() : builder.queuedDataPackets);
+ this.iter = queuedDataPackets.iterator();
+ this.transactions = new ArrayList<>();
+ }
+
+ @Override
+ public Transaction createTransaction(TransferDirection direction) throws IOException
+ {
+ MockTransaction transaction = new MockTransaction(iter);
+ transactions.add(transaction);
+ return transaction;
+ }
+
+ @Override
+ public boolean isSecure() throws IOException
+ {
+ return false;
+ }
+
+ @Override
+ public SiteToSiteClientConfig getConfig()
+ {
+ return config;
+ }
+
+ @Override
+ public void close() throws IOException
+ {
+ // nothing to do
+ }
+
+ public List<MockTransaction> getMockTransactions()
+ {
+ return Collections.unmodifiableList(transactions);
+ }
+
+ /**
+ * A builder for MockSiteToSiteClients.
+ */
+ public static class Builder extends SiteToSiteClient.Builder
+ {
+
+ private List<DataPacket> queuedDataPackets;
+
+ public MockSiteToSiteClient.Builder queue(List<DataPacket> queuedDataPackets)
+ {
+ this.queuedDataPackets = new ArrayList<>(queuedDataPackets);
+ return this;
+ }
+
+ @Override
+ public SiteToSiteClient build()
+ {
+ return new MockSiteToSiteClient(this);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/2cb84942/contrib/src/test/java/com/datatorrent/contrib/nifi/mock/MockTransaction.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/com/datatorrent/contrib/nifi/mock/MockTransaction.java b/contrib/src/test/java/com/datatorrent/contrib/nifi/mock/MockTransaction.java
new file mode 100644
index 0000000..373f62b
--- /dev/null
+++ b/contrib/src/test/java/com/datatorrent/contrib/nifi/mock/MockTransaction.java
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.contrib.nifi.mock;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.remote.Communicant;
+import org.apache.nifi.remote.Transaction;
+import org.apache.nifi.remote.TransactionCompletion;
+import org.apache.nifi.remote.protocol.DataPacket;
+import org.apache.nifi.stream.io.ByteArrayInputStream;
+
+/**
+ * A mock Transaction that will return the data passed in the constructor on calls to receive(),
+ * and will store the data packets passed to send().
+ */
+public class MockTransaction implements Transaction
+{
+
+ private Iterator<DataPacket> dataPacketIter;
+
+ private List<DataPacket> sentDataPackets = new ArrayList<>();
+
+ public MockTransaction(Iterator<DataPacket> iter)
+ {
+ this.dataPacketIter = iter;
+ }
+
+ @Override
+ public void send(DataPacket dataPacket) throws IOException
+ {
+ if (dataPacket != null) {
+ this.sentDataPackets.add(dataPacket);
+ }
+ }
+
+ @Override
+ public void send(byte[] content, Map<String, String> attributes) throws IOException
+ {
+ this.sentDataPackets.add(new MockDataPacket(attributes, new ByteArrayInputStream(content), content.length));
+ }
+
+ public List<DataPacket> getSentDataPackets()
+ {
+ return Collections.unmodifiableList(sentDataPackets);
+ }
+
+ @Override
+ public DataPacket receive() throws IOException
+ {
+ if (dataPacketIter != null && dataPacketIter.hasNext()) {
+ return dataPacketIter.next();
+ } else {
+ return null;
+ }
+ }
+
+ @Override
+ public void confirm() throws IOException
+ {
+
+ }
+
+ @Override
+ public TransactionCompletion complete() throws IOException
+ {
+ return new TransactionCompletion()
+ {
+ @Override
+ public boolean isBackoff()
+ {
+ return false;
+ }
+
+ @Override
+ public int getDataPacketsTransferred()
+ {
+ return 0;
+ }
+
+ @Override
+ public long getBytesTransferred()
+ {
+ return 0;
+ }
+
+ @Override
+ public long getDuration(TimeUnit timeUnit)
+ {
+ return 0;
+ }
+ };
+ }
+
+ @Override
+ public void cancel(String explanation) throws IOException
+ {
+
+ }
+
+ @Override
+ public void error()
+ {
+
+ }
+
+ @Override
+ public TransactionState getState() throws IOException
+ {
+ return TransactionState.TRANSACTION_COMPLETED;
+ }
+
+ @Override
+ public Communicant getCommunicant()
+ {
+ return new Communicant()
+ {
+ @Override
+ public String getUrl()
+ {
+ return null;
+ }
+
+ @Override
+ public String getHost()
+ {
+ return null;
+ }
+
+ @Override
+ public int getPort()
+ {
+ return 0;
+ }
+
+ @Override
+ public String getDistinguishedName()
+ {
+ return null;
+ }
+ };
+ }
+
+}
[2/2] incubator-apex-malhar git commit: Merge branch
'MLHR-1936.nifi-connector' of https://github.com/bbende/incubator-apex-malhar
into devel-3
Posted by th...@apache.org.
Merge branch 'MLHR-1936.nifi-connector' of https://github.com/bbende/incubator-apex-malhar into devel-3
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/98495ab6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/98495ab6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/98495ab6
Branch: refs/heads/devel-3
Commit: 98495ab62be21cc6a5ebcff32a27dd9a02e5ea23
Parents: a44e81a 2cb8494
Author: Thomas Weise <th...@datatorrent.com>
Authored: Sat Jan 30 11:29:32 2016 -0800
Committer: Thomas Weise <th...@datatorrent.com>
Committed: Sat Jan 30 11:29:32 2016 -0800
----------------------------------------------------------------------
contrib/pom.xml | 6 +
.../contrib/nifi/AbstractNiFiInputOperator.java | 212 +++++++++++++++++
.../nifi/AbstractNiFiOutputOperator.java | 187 +++++++++++++++
.../AbstractNiFiSinglePortInputOperator.java | 83 +++++++
.../contrib/nifi/NiFiDataPacket.java | 42 ++++
.../contrib/nifi/NiFiDataPacketBuilder.java | 33 +++
.../nifi/NiFiSinglePortInputOperator.java | 72 ++++++
.../nifi/NiFiSinglePortOutputOperator.java | 126 ++++++++++
.../contrib/nifi/StandardNiFiDataPacket.java | 59 +++++
.../nifi/NiFiSinglePortInputOperatorTest.java | 200 ++++++++++++++++
.../nifi/NiFiSinglePortOutputOperatorTest.java | 231 +++++++++++++++++++
.../nifi/demo/TestNiFiInputApplication.java | 63 +++++
.../nifi/demo/TestNiFiOutputApplication.java | 85 +++++++
.../contrib/nifi/mock/MockDataPacket.java | 58 +++++
.../contrib/nifi/mock/MockSiteToSiteClient.java | 102 ++++++++
.../contrib/nifi/mock/MockTransaction.java | 166 +++++++++++++
16 files changed, 1725 insertions(+)
----------------------------------------------------------------------