You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2014/12/08 21:29:29 UTC
[06/51] [partial] incubator-nifi git commit: Initial code contribution
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/TransferDirection.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/TransferDirection.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/TransferDirection.java
new file mode 100644
index 0000000..56432d5
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/TransferDirection.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote;
+
+public enum TransferDirection {
+
+ SEND,
+ RECEIVE;
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/VersionedRemoteResource.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/VersionedRemoteResource.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/VersionedRemoteResource.java
new file mode 100644
index 0000000..bfccd98
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/VersionedRemoteResource.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote;
+
+public interface VersionedRemoteResource {
+
+ VersionNegotiator getVersionNegotiator();
+
+ String getResourceName();
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/codec/FlowFileCodec.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/codec/FlowFileCodec.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/codec/FlowFileCodec.java
new file mode 100644
index 0000000..b4206b3
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/codec/FlowFileCodec.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.codec;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.List;
+
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.remote.VersionedRemoteResource;
+import org.apache.nifi.remote.exception.ProtocolException;
+import org.apache.nifi.remote.exception.TransmissionDisabledException;
+
+/**
+ * <p>
+ * Provides a mechanism for encoding and decoding FlowFiles as streams so that
+ * they can be transferred remotely.
+ * </p>
+ */
+public interface FlowFileCodec extends VersionedRemoteResource {
+
+ /**
+ * Returns a List of all versions that this codec is able to support, in the
+ * order that they are preferred by the codec
+ *
+ * @return
+ */
+ public List<Integer> getSupportedVersions();
+
+ /**
+ * Encodes a FlowFile and its content as a single stream of data and writes
+ * that stream to the output. If checksum is not null, it will be calculated
+ * as the stream is read
+ *
+ * @param flowFile the FlowFile to encode
+ * @param session a session that can be used to transactionally create and
+ * transfer flow files
+ * @param outStream the stream to write the data to
+ *
+ * @return the updated FlowFile
+ *
+ * @throws IOException
+ */
+ FlowFile encode(FlowFile flowFile, ProcessSession session, OutputStream outStream) throws IOException, TransmissionDisabledException;
+
+ /**
+ * Decodes the contents of the InputStream, interpreting the data to
+ * determine the next FlowFile's attributes and content, as well as their
+ * destinations. If not null, checksum will be used to calculate the
+ * checksum as the data is read.
+ *
+ * @param stream an InputStream containing FlowFiles' contents, attributes,
+ * and destinations
+ * @param session
+ *
+ * @return the FlowFile that was created, or <code>null</code> if the stream
+ * was out of data
+ *
+ * @throws IOException
+ * @throws ProtocolException if the input is malformed
+ */
+ FlowFile decode(InputStream stream, ProcessSession session) throws IOException, ProtocolException, TransmissionDisabledException;
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/BadRequestException.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/BadRequestException.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/BadRequestException.java
new file mode 100644
index 0000000..f6c2f4f
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/BadRequestException.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.exception;
+
+public class BadRequestException extends Exception {
+
+ private static final long serialVersionUID = -8034602852256106560L;
+
+ public BadRequestException(final String message) {
+ super(message);
+ }
+
+ public BadRequestException(final Throwable cause) {
+ super(cause);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/HandshakeException.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/HandshakeException.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/HandshakeException.java
new file mode 100644
index 0000000..b61fc65
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/HandshakeException.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.exception;
+
+public class HandshakeException extends Exception {
+
+ private static final long serialVersionUID = 178192341908726L;
+
+ public HandshakeException(final String message) {
+ super(message);
+ }
+
+ public HandshakeException(final Throwable cause) {
+ super(cause);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/NotAuthorizedException.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/NotAuthorizedException.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/NotAuthorizedException.java
new file mode 100644
index 0000000..24ff3a5
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/NotAuthorizedException.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.exception;
+
+public class NotAuthorizedException extends Exception {
+
+ private static final long serialVersionUID = 2952623568114035498L;
+
+ public NotAuthorizedException(final String message) {
+ super(message);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/PortNotRunningException.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/PortNotRunningException.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/PortNotRunningException.java
new file mode 100644
index 0000000..af0f467
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/PortNotRunningException.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.exception;
+
+public class PortNotRunningException extends Exception {
+
+ private static final long serialVersionUID = -2790940982005516375L;
+
+ public PortNotRunningException(final String message) {
+ super(message);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/ProtocolException.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/ProtocolException.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/ProtocolException.java
new file mode 100644
index 0000000..0f50b98
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/ProtocolException.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.exception;
+
+public class ProtocolException extends Exception {
+
+ private static final long serialVersionUID = 5763900324505818495L;
+
+ public ProtocolException(final String message, final Throwable cause) {
+ super(message, cause);
+ }
+
+ public ProtocolException(final String message) {
+ super(message);
+ }
+
+ public ProtocolException(final Throwable cause) {
+ super(cause);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/RequestExpiredException.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/RequestExpiredException.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/RequestExpiredException.java
new file mode 100644
index 0000000..dd675b3
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/RequestExpiredException.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.exception;
+
+/**
+ * Used to indicate that by the time the request was serviced, it had already
+ * expired
+ */
+public class RequestExpiredException extends Exception {
+
+ private static final long serialVersionUID = -7037025330562827852L;
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/UnknownPortException.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/UnknownPortException.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/UnknownPortException.java
new file mode 100644
index 0000000..e6a0fe7
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/UnknownPortException.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.exception;
+
+public class UnknownPortException extends Exception {
+
+ private static final long serialVersionUID = -2790940982005516375L;
+
+ public UnknownPortException(final String message) {
+ super(message);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/protocol/ClientProtocol.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/protocol/ClientProtocol.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/protocol/ClientProtocol.java
new file mode 100644
index 0000000..32274eb
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/protocol/ClientProtocol.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.protocol;
+
+import java.io.IOException;
+import java.util.Set;
+
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.remote.Peer;
+import org.apache.nifi.remote.PeerStatus;
+import org.apache.nifi.remote.VersionedRemoteResource;
+import org.apache.nifi.remote.codec.FlowFileCodec;
+import org.apache.nifi.remote.exception.HandshakeException;
+import org.apache.nifi.remote.exception.PortNotRunningException;
+import org.apache.nifi.remote.exception.ProtocolException;
+import org.apache.nifi.remote.exception.UnknownPortException;
+
+public interface ClientProtocol extends VersionedRemoteResource {
+
+ void handshake(Peer peer) throws IOException, HandshakeException, UnknownPortException, PortNotRunningException;
+
+ Set<PeerStatus> getPeerStatuses(Peer currentPeer) throws IOException, ProtocolException;
+
+ FlowFileCodec negotiateCodec(Peer peer) throws IOException, ProtocolException;
+
+ void receiveFlowFiles(Peer peer, ProcessContext context, ProcessSession session, FlowFileCodec codec) throws IOException, ProtocolException;
+
+ void transferFlowFiles(Peer peer, ProcessContext context, ProcessSession session, FlowFileCodec codec) throws IOException, ProtocolException;
+
+ void shutdown(Peer peer) throws IOException, ProtocolException;
+
+ boolean isReadyForFileTransfer();
+
+ /**
+ * returns <code>true</code> if remote instance indicates that the port is
+ * invalid
+ *
+ * @return
+ * @throws IllegalStateException if a handshake has not successfully
+ * completed
+ */
+ boolean isPortInvalid() throws IllegalStateException;
+
+ /**
+ * returns <code>true</code> if remote instance indicates that the port is
+ * unknown
+ *
+ * @return
+ * @throws IllegalStateException if a handshake has not successfully
+ * completed
+ */
+ boolean isPortUnknown();
+
+ /**
+ * returns <code>true</code> if remote instance indicates that the port's
+ * destination is full
+ *
+ * @return
+ * @throws IllegalStateException if a handshake has not successfully
+ * completed
+ */
+ boolean isDestinationFull();
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/protocol/CommunicationsInput.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/protocol/CommunicationsInput.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/protocol/CommunicationsInput.java
new file mode 100644
index 0000000..d2e2946
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/protocol/CommunicationsInput.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.protocol;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+public interface CommunicationsInput {
+
+ InputStream getInputStream() throws IOException;
+
+ long getBytesRead();
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/protocol/CommunicationsOutput.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/protocol/CommunicationsOutput.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/protocol/CommunicationsOutput.java
new file mode 100644
index 0000000..95cab29
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/protocol/CommunicationsOutput.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.protocol;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+public interface CommunicationsOutput {
+
+ OutputStream getOutputStream() throws IOException;
+
+ long getBytesWritten();
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/protocol/CommunicationsSession.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/protocol/CommunicationsSession.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/protocol/CommunicationsSession.java
new file mode 100644
index 0000000..d009cec
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/protocol/CommunicationsSession.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.protocol;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+public interface CommunicationsSession extends Closeable {
+
+ public static final byte[] MAGIC_BYTES = {(byte) 'N', (byte) 'i', (byte) 'F', (byte) 'i'};
+
+ CommunicationsInput getInput();
+
+ CommunicationsOutput getOutput();
+
+ void setTimeout(int millis) throws IOException;
+
+ int getTimeout() throws IOException;
+
+ void setUri(String uri);
+
+ String getUri();
+
+ String getUserDn();
+
+ void setUserDn(String dn);
+
+ boolean isDataAvailable();
+
+ long getBytesWritten();
+
+ long getBytesRead();
+
+ /**
+ * Asynchronously interrupts this FlowFileCodec. Implementations must ensure
+ * that they stop sending and receiving data as soon as possible after this
+ * method has been called, even if doing so results in sending only partial
+ * data to the peer. This will usually result in the peer throwing a
+ * SocketTimeoutException.
+ */
+ void interrupt();
+
+ /**
+ * Returns <code>true</code> if the connection is closed, <code>false</code>
+ * otherwise.
+ *
+ * @return
+ */
+ boolean isClosed();
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/protocol/RequestType.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/protocol/RequestType.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/protocol/RequestType.java
new file mode 100644
index 0000000..41334fe
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/protocol/RequestType.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.protocol;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+public enum RequestType {
+
+ NEGOTIATE_FLOWFILE_CODEC,
+ REQUEST_PEER_LIST,
+ SEND_FLOWFILES,
+ RECEIVE_FLOWFILES,
+ SHUTDOWN;
+
+ public void writeRequestType(final DataOutputStream dos) throws IOException {
+ dos.writeUTF(name());
+ }
+
+ public static RequestType readRequestType(final DataInputStream dis) throws IOException {
+ final String requestTypeVal = dis.readUTF();
+ try {
+ return RequestType.valueOf(requestTypeVal);
+ } catch (final Exception e) {
+ throw new IOException("Could not determine RequestType: received invalid value " + requestTypeVal);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/protocol/ServerProtocol.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/protocol/ServerProtocol.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/protocol/ServerProtocol.java
new file mode 100644
index 0000000..0d18f2e
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/protocol/ServerProtocol.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.protocol;
+
+import java.io.IOException;
+
+import org.apache.nifi.cluster.NodeInformant;
+import org.apache.nifi.groups.ProcessGroup;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.remote.Peer;
+import org.apache.nifi.remote.RootGroupPort;
+import org.apache.nifi.remote.VersionedRemoteResource;
+import org.apache.nifi.remote.codec.FlowFileCodec;
+import org.apache.nifi.remote.exception.BadRequestException;
+import org.apache.nifi.remote.exception.HandshakeException;
+import org.apache.nifi.remote.exception.ProtocolException;
+
+public interface ServerProtocol extends VersionedRemoteResource {
+
+ /**
+ *
+ * @param rootGroup
+ */
+ void setRootProcessGroup(ProcessGroup rootGroup);
+
+ RootGroupPort getPort();
+
+ /**
+ * Optional operation. Sets the NodeInformant to use in this Protocol, if a
+ * NodeInformant is supported. Otherwise, throws
+ * UnsupportedOperationException
+ *
+ * @param nodeInformant
+ */
+ void setNodeInformant(NodeInformant nodeInformant);
+
+ /**
+ * Receives the handshake from the Peer
+ *
+ * @param peer
+ * @throws IOException
+ * @throws HandshakeException
+ */
+ void handshake(Peer peer) throws IOException, HandshakeException;
+
+ /**
+ * Returns <code>true</code> if the handshaking process was completed
+ * successfully, <code>false</code> if either the handshaking process has
+ * not happened or the handshake failed
+ *
+ * @return
+ */
+ boolean isHandshakeSuccessful();
+
+ /**
+ * Negotiates the FlowFileCodec that is to be used for transferring
+ * FlowFiles
+ *
+ * @param peer
+ * @return
+ * @throws IOException
+ * @throws BadRequestException
+ */
+ FlowFileCodec negotiateCodec(Peer peer) throws IOException, ProtocolException;
+
+ /**
+ * Returns the codec that has already been negotiated by this Protocol, if
+ * any.
+ *
+ * @return
+ */
+ FlowFileCodec getPreNegotiatedCodec();
+
+ /**
+ * Reads the Request Type of the next request from the Peer
+ *
+ * @return the RequestType that the peer would like to happen - or null, if
+ * no data available
+ */
+ RequestType getRequestType(Peer peer) throws IOException;
+
+ /**
+ * Sends FlowFiles to the specified peer
+ *
+ * @param peer
+ * @param context
+ * @param session
+ * @param codec
+ *
+ * @return the number of FlowFiles transferred
+ */
+ int transferFlowFiles(Peer peer, ProcessContext context, ProcessSession session, FlowFileCodec codec) throws IOException, ProtocolException;
+
+ /**
+ * Receives FlowFiles from the specified peer
+ *
+ * @param peer
+ * @param context
+ * @param session
+ * @param codec
+ * @throws IOException
+ *
+ * @return the number of FlowFiles received
+ * @throws ProtocolException
+ */
+ int receiveFlowFiles(Peer peer, ProcessContext context, ProcessSession session, FlowFileCodec codec) throws IOException, ProtocolException;
+
+ /**
+ * Returns the number of milliseconds after a request is received for which
+ * the request is still valid. A valid of 0 indicates that the request will
+ * not expire.
+ *
+ * @return
+ */
+ long getRequestExpiration();
+
+ /**
+ * Sends a list of all nodes in the cluster to the specified peer. If not in
+ * a cluster, sends info about itself
+ *
+ * @param peer
+ */
+ void sendPeerList(Peer peer) throws IOException;
+
+ void shutdown(Peer peer);
+
+ boolean isShutdown();
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core/.gitignore
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/core/.gitignore b/nar-bundles/framework-bundle/framework/core/.gitignore
new file mode 100755
index 0000000..ea8c4bf
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/core/.gitignore
@@ -0,0 +1 @@
+/target
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core/pom.xml
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/core/pom.xml b/nar-bundles/framework-bundle/framework/core/pom.xml
new file mode 100644
index 0000000..547c75d
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/core/pom.xml
@@ -0,0 +1,130 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-framework-parent</artifactId>
+ <version>0.0.1-SNAPSHOT</version>
+ </parent>
+ <artifactId>framework-core</artifactId>
+ <packaging>jar</packaging>
+ <name>NiFi Framework Core</name>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>core-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-expression-language</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-utils</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-file-utils</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-properties</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>site-to-site</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-core-flowfile-attributes</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>ch.qos.logback</groupId>
+ <artifactId>logback-classic</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>framework-cluster-protocol</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-logging-utils</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>client-dto</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.quartz-scheduler</groupId>
+ <artifactId>quartz</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.h2database</groupId>
+ <artifactId>h2</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.jasypt</groupId>
+ <artifactId>jasypt</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.bouncycastle</groupId>
+ <artifactId>bcprov-jdk16</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.sun.jersey</groupId>
+ <artifactId>jersey-client</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-security</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.springframework</groupId>
+ <artifactId>spring-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.springframework</groupId>
+ <artifactId>spring-beans</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.springframework</groupId>
+ <artifactId>spring-context</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>commons-io</groupId>
+ <artifactId>commons-io</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>data-provenance-utils</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>wali</groupId>
+ <artifactId>wali</artifactId>
+ <version>3.0.0-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-mock</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+</project>
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/cluster/BulletinsPayload.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/cluster/BulletinsPayload.java b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/cluster/BulletinsPayload.java
new file mode 100644
index 0000000..1249657
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/cluster/BulletinsPayload.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Set;
+import javax.xml.bind.JAXBContext;
+import javax.xml.bind.JAXBException;
+import javax.xml.bind.Marshaller;
+import javax.xml.bind.Unmarshaller;
+import javax.xml.bind.annotation.XmlRootElement;
+import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter;
+import org.apache.nifi.cluster.protocol.ProtocolException;
+import org.apache.nifi.jaxb.BulletinAdapter;
+import org.apache.nifi.reporting.Bulletin;
+
+/**
+ * The payload of the bulletins.
+ *
+ * @author unattributed
+ */
+@XmlRootElement
+public class BulletinsPayload {
+
+ private static final JAXBContext JAXB_CONTEXT;
+
+ static {
+ try {
+ JAXB_CONTEXT = JAXBContext.newInstance(BulletinsPayload.class);
+ } catch (JAXBException e) {
+ throw new RuntimeException("Unable to create JAXBContext.");
+ }
+ }
+
+ private Set<Bulletin> bulletins;
+
+ @XmlJavaTypeAdapter(BulletinAdapter.class)
+ public Set<Bulletin> getBulletins() {
+ return bulletins;
+ }
+
+ public void setBulletins(final Set<Bulletin> bulletins) {
+ this.bulletins = bulletins;
+ }
+
+ public byte[] marshal() throws ProtocolException {
+ final ByteArrayOutputStream payloadBytes = new ByteArrayOutputStream();
+ marshal(this, payloadBytes);
+ return payloadBytes.toByteArray();
+ }
+
+ public static void marshal(final BulletinsPayload payload, final OutputStream os) throws ProtocolException {
+ try {
+ final Marshaller marshaller = JAXB_CONTEXT.createMarshaller();
+ marshaller.marshal(payload, os);
+ } catch (final JAXBException je) {
+ throw new ProtocolException(je);
+ }
+ }
+
+ public static BulletinsPayload unmarshal(final InputStream is) throws ProtocolException {
+ try {
+ final Unmarshaller unmarshaller = JAXB_CONTEXT.createUnmarshaller();
+ return (BulletinsPayload) unmarshaller.unmarshal(is);
+ } catch (final JAXBException je) {
+ throw new ProtocolException(je);
+ }
+ }
+
+ public static BulletinsPayload unmarshal(final byte[] bytes) throws ProtocolException {
+ try {
+ final Unmarshaller unmarshaller = JAXB_CONTEXT.createUnmarshaller();
+ return (BulletinsPayload) unmarshaller.unmarshal(new ByteArrayInputStream(bytes));
+ } catch (final JAXBException je) {
+ throw new ProtocolException(je);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/cluster/ConnectionException.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/cluster/ConnectionException.java b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/cluster/ConnectionException.java
new file mode 100644
index 0000000..986e904
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/cluster/ConnectionException.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster;
+
+/**
+ * Represents the exceptional case when connection to the cluster fails.
+ *
+ * @author unattributed
+ */
+public class ConnectionException extends RuntimeException {
+
+ private static final long serialVersionUID = -1378294897231234028L;
+
+ public ConnectionException() {
+ }
+
+ public ConnectionException(String msg) {
+ super(msg);
+ }
+
+ public ConnectionException(Throwable cause) {
+ super(cause);
+ }
+
+ public ConnectionException(String msg, Throwable cause) {
+ super(msg, cause);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/cluster/DisconnectionException.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/cluster/DisconnectionException.java b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/cluster/DisconnectionException.java
new file mode 100644
index 0000000..55707f3
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/cluster/DisconnectionException.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster;
+
+/**
+ * Represents the exceptional case when disconnection from the cluster fails.
+ *
+ * @author unattributed
+ */
+public class DisconnectionException extends RuntimeException {
+
+ private static final long serialVersionUID = 6648876367997026125L;
+
+ public DisconnectionException() {
+ }
+
+ public DisconnectionException(String msg) {
+ super(msg);
+ }
+
+ public DisconnectionException(Throwable cause) {
+ super(cause);
+ }
+
+ public DisconnectionException(String msg, Throwable cause) {
+ super(msg, cause);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/cluster/HeartbeatPayload.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/cluster/HeartbeatPayload.java b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/cluster/HeartbeatPayload.java
new file mode 100644
index 0000000..093b238
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/cluster/HeartbeatPayload.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.List;
+import javax.xml.bind.JAXBContext;
+import javax.xml.bind.JAXBException;
+import javax.xml.bind.Marshaller;
+import javax.xml.bind.Unmarshaller;
+import javax.xml.bind.annotation.XmlRootElement;
+import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter;
+import org.apache.nifi.cluster.protocol.ProtocolException;
+import org.apache.nifi.controller.Counter;
+import org.apache.nifi.controller.status.ProcessGroupStatus;
+import org.apache.nifi.diagnostics.SystemDiagnostics;
+import org.apache.nifi.jaxb.CounterAdapter;
+
+/**
+ * The payload of the heartbeat. The payload contains status to inform the
+ * cluster manager the current workload of this node.
+ *
+ * @author unattributed
+ */
+@XmlRootElement
+public class HeartbeatPayload {
+
+ private static final JAXBContext JAXB_CONTEXT;
+
+ static {
+ try {
+ JAXB_CONTEXT = JAXBContext.newInstance(HeartbeatPayload.class);
+ } catch (JAXBException e) {
+ throw new RuntimeException("Unable to create JAXBContext.");
+ }
+ }
+
+ private List<Counter> counters;
+ private ProcessGroupStatus processGroupStatus;
+ private int activeThreadCount;
+ private long totalFlowFileCount;
+ private long totalFlowFileBytes;
+ private SystemDiagnostics systemDiagnostics;
+ private Integer siteToSitePort;
+ private boolean siteToSiteSecure;
+ private long systemStartTime;
+
+ @XmlJavaTypeAdapter(CounterAdapter.class)
+ public List<Counter> getCounters() {
+ return counters;
+ }
+
+ public void setCounters(final List<Counter> counters) {
+ this.counters = counters;
+ }
+
+ public int getActiveThreadCount() {
+ return activeThreadCount;
+ }
+
+ public void setActiveThreadCount(final int activeThreadCount) {
+ this.activeThreadCount = activeThreadCount;
+ }
+
+ public long getTotalFlowFileCount() {
+ return totalFlowFileCount;
+ }
+
+ public void setTotalFlowFileCount(final long totalFlowFileCount) {
+ this.totalFlowFileCount = totalFlowFileCount;
+ }
+
+ public long getTotalFlowFileBytes() {
+ return totalFlowFileBytes;
+ }
+
+ public void setTotalFlowFileBytes(final long totalFlowFileBytes) {
+ this.totalFlowFileBytes = totalFlowFileBytes;
+ }
+
+ public ProcessGroupStatus getProcessGroupStatus() {
+ return processGroupStatus;
+ }
+
+ public void setProcessGroupStatus(final ProcessGroupStatus processGroupStatus) {
+ this.processGroupStatus = processGroupStatus;
+ }
+
+ public SystemDiagnostics getSystemDiagnostics() {
+ return systemDiagnostics;
+ }
+
+ public void setSystemDiagnostics(final SystemDiagnostics systemDiagnostics) {
+ this.systemDiagnostics = systemDiagnostics;
+ }
+
+ public boolean isSiteToSiteSecure() {
+ return siteToSiteSecure;
+ }
+
+ public void setSiteToSiteSecure(final boolean secure) {
+ this.siteToSiteSecure = secure;
+ }
+
+ public Integer getSiteToSitePort() {
+ return siteToSitePort;
+ }
+
+ public void setSiteToSitePort(final Integer port) {
+ this.siteToSitePort = port;
+ }
+
+ public long getSystemStartTime() {
+ return systemStartTime;
+ }
+
+ public void setSystemStartTime(final long systemStartTime) {
+ this.systemStartTime = systemStartTime;
+ }
+
+ public byte[] marshal() throws ProtocolException {
+ final ByteArrayOutputStream payloadBytes = new ByteArrayOutputStream();
+ marshal(this, payloadBytes);
+ return payloadBytes.toByteArray();
+ }
+
+ public static void marshal(final HeartbeatPayload payload, final OutputStream os) throws ProtocolException {
+ try {
+ final Marshaller marshaller = JAXB_CONTEXT.createMarshaller();
+ marshaller.marshal(payload, os);
+ } catch (final JAXBException je) {
+ throw new ProtocolException(je);
+ }
+ }
+
+ public static HeartbeatPayload unmarshal(final InputStream is) throws ProtocolException {
+ try {
+ final Unmarshaller unmarshaller = JAXB_CONTEXT.createUnmarshaller();
+ return (HeartbeatPayload) unmarshaller.unmarshal(is);
+ } catch (final JAXBException je) {
+ throw new ProtocolException(je);
+ }
+ }
+
+ public static HeartbeatPayload unmarshal(final byte[] bytes) throws ProtocolException {
+ try {
+ final Unmarshaller unmarshaller = JAXB_CONTEXT.createUnmarshaller();
+ return (HeartbeatPayload) unmarshaller.unmarshal(new ByteArrayInputStream(bytes));
+ } catch (final JAXBException je) {
+ throw new ProtocolException(je);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/connectable/LocalPort.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/connectable/LocalPort.java b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/connectable/LocalPort.java
new file mode 100644
index 0000000..1efa0cd
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/connectable/LocalPort.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.connectable;
+
+import org.apache.nifi.connectable.ConnectableType;
+import org.apache.nifi.connectable.Connection;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.controller.AbstractPort;
+import org.apache.nifi.controller.ProcessScheduler;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.groups.ProcessGroup;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.scheduling.SchedulingStrategy;
+
+/**
+ * Provides a mechanism by which <code>FlowFile</code>s can be transferred into
+ * and out of a <code>ProcessGroup</code> to and/or from another
+ * <code>ProcessGroup</code> within the same instance of NiFi.
+ */
+public class LocalPort extends AbstractPort {
+
+ private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
+ private final Lock readLock = rwLock.readLock();
+ private final Lock writeLock = rwLock.writeLock();
+
+ public LocalPort(final String id, final String name, final ProcessGroup processGroup, final ConnectableType type, final ProcessScheduler scheduler) {
+ super(id, name, processGroup, type, scheduler);
+ }
+
+ @Override
+ public boolean isValid() {
+ return !getConnections(Relationship.ANONYMOUS).isEmpty();
+ }
+
+ @Override
+ public Collection<ValidationResult> getValidationErrors() {
+ final Collection<ValidationResult> validationErrors = new ArrayList<>();
+ if (!isValid()) {
+ final ValidationResult error = new ValidationResult.Builder()
+ .explanation(String.format("Output connection for port '%s' is not defined.", getName()))
+ .subject(String.format("Port '%s'", getName()))
+ .valid(false)
+ .build();
+ validationErrors.add(error);
+ }
+ return validationErrors;
+ }
+
+ @Override
+ public void onTrigger(final ProcessContext context, final ProcessSession session) {
+ readLock.lock();
+ try {
+ final List<FlowFile> flowFiles = session.get(10);
+ if (flowFiles.isEmpty()) {
+ context.yield();
+ } else {
+ session.transfer(flowFiles, Relationship.ANONYMOUS);
+ }
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ @Override
+ public void updateConnection(final Connection connection) throws IllegalStateException {
+ writeLock.lock();
+ try {
+ super.updateConnection(connection);
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+ @Override
+ public void addConnection(final Connection connection) throws IllegalArgumentException {
+ writeLock.lock();
+ try {
+ super.addConnection(connection);
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+ @Override
+ public void removeConnection(final Connection connection) throws IllegalArgumentException, IllegalStateException {
+ writeLock.lock();
+ try {
+ super.removeConnection(connection);
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+ @Override
+ public Set<Connection> getConnections() {
+ readLock.lock();
+ try {
+ return super.getConnections();
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ @Override
+ public Set<Connection> getConnections(Relationship relationship) {
+ readLock.lock();
+ try {
+ return super.getConnections(relationship);
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ @Override
+ public List<Connection> getIncomingConnections() {
+ readLock.lock();
+ try {
+ return super.getIncomingConnections();
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ @Override
+ public boolean hasIncomingConnection() {
+ readLock.lock();
+ try {
+ return super.hasIncomingConnection();
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ @Override
+ public boolean isTriggerWhenEmpty() {
+ return false;
+ }
+
+ @Override
+ public SchedulingStrategy getSchedulingStrategy() {
+ return SchedulingStrategy.EVENT_DRIVEN;
+ }
+
+ @Override
+ public boolean isSideEffectFree() {
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/connectable/StandardConnection.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/connectable/StandardConnection.java b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/connectable/StandardConnection.java
new file mode 100644
index 0000000..1d723b5
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/connectable/StandardConnection.java
@@ -0,0 +1,336 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.connectable;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.nifi.controller.FlowFileQueue;
+import org.apache.nifi.controller.ProcessScheduler;
+import org.apache.nifi.controller.StandardFlowFileQueue;
+import org.apache.nifi.controller.repository.FlowFileRecord;
+import org.apache.nifi.groups.ProcessGroup;
+import org.apache.nifi.processor.FlowFileFilter;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.util.NiFiProperties;
+
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+
+/**
+ * Models a connection between connectable components. A connection may contain
+ * one or more relationships that map the source component to the destination
+ * component.
+ */
+public final class StandardConnection implements Connection {
+
+ private final String id;
+ private final AtomicReference<ProcessGroup> processGroup;
+ private final AtomicReference<String> name;
+ private final AtomicReference<List<Position>> bendPoints;
+ private final Connectable source;
+ private final AtomicReference<Connectable> destination;
+ private final AtomicReference<Collection<Relationship>> relationships;
+ private final StandardFlowFileQueue flowFileQueue;
+ private final AtomicInteger labelIndex = new AtomicInteger(1);
+ private final AtomicLong zIndex = new AtomicLong(0L);
+ private final ProcessScheduler scheduler;
+ private final int hashCode;
+
+ private StandardConnection(final Builder builder) {
+ id = builder.id;
+ name = new AtomicReference<>(builder.name);
+ bendPoints = new AtomicReference<>(Collections.unmodifiableList(new ArrayList<>(builder.bendPoints)));
+ processGroup = new AtomicReference<>(builder.processGroup);
+ source = builder.source;
+ destination = new AtomicReference<>(builder.destination);
+ relationships = new AtomicReference<>(Collections.unmodifiableCollection(builder.relationships));
+ scheduler = builder.scheduler;
+ flowFileQueue = new StandardFlowFileQueue(id, this, scheduler, NiFiProperties.getInstance().getQueueSwapThreshold());
+ hashCode = new HashCodeBuilder(7, 67).append(id).toHashCode();
+ }
+
+ public ProcessGroup getProcessGroup() {
+ return processGroup.get();
+ }
+
+ public String getIdentifier() {
+ return id;
+ }
+
+ public String getName() {
+ return name.get();
+ }
+
+ public void setName(final String name) {
+ this.name.set(name);
+ }
+
+ @Override
+ public List<Position> getBendPoints() {
+ return bendPoints.get();
+ }
+
+ @Override
+ public void setBendPoints(final List<Position> position) {
+ this.bendPoints.set(Collections.unmodifiableList(new ArrayList<>(position)));
+ }
+
+ public int getLabelIndex() {
+ return labelIndex.get();
+ }
+
+ public void setLabelIndex(final int labelIndex) {
+ this.labelIndex.set(labelIndex);
+ }
+
+ @Override
+ public long getZIndex() {
+ return zIndex.get();
+ }
+
+ @Override
+ public void setZIndex(final long zIndex) {
+ this.zIndex.set(zIndex);
+ }
+
+ public Connectable getSource() {
+ return source;
+ }
+
+ public Connectable getDestination() {
+ return destination.get();
+ }
+
+ public Collection<Relationship> getRelationships() {
+ return relationships.get();
+ }
+
+ public FlowFileQueue getFlowFileQueue() {
+ return flowFileQueue;
+ }
+
+ public void setProcessGroup(final ProcessGroup newGroup) {
+ final ProcessGroup currentGroup = this.processGroup.get();
+ try {
+ this.processGroup.set(newGroup);
+ } catch (final RuntimeException e) {
+ this.processGroup.set(currentGroup);
+ throw e;
+ }
+ }
+
+ public void setRelationships(final Collection<Relationship> newRelationships) {
+ final Collection<Relationship> currentRelationships = relationships.get();
+ if (currentRelationships.equals(newRelationships)) {
+ return;
+ }
+
+ if (getSource().isRunning()) {
+ throw new IllegalStateException("Cannot update the relationships for Connection because the source of the Connection is running");
+ }
+
+ try {
+ this.relationships.set(new ArrayList<>(newRelationships));
+ getSource().updateConnection(this);
+ } catch (final RuntimeException e) {
+ this.relationships.set(currentRelationships);
+ throw e;
+ }
+ }
+
+ public void setDestination(final Connectable newDestination) {
+ final Connectable previousDestination = destination.get();
+ if (previousDestination.equals(newDestination)) {
+ return;
+ }
+
+ if (previousDestination.isRunning() && !(previousDestination instanceof Funnel || previousDestination instanceof LocalPort)) {
+ throw new IllegalStateException("Cannot change destination of Connection because the current destination is running");
+ }
+
+ try {
+ previousDestination.removeConnection(this);
+ this.destination.set(newDestination);
+ getSource().updateConnection(this);
+
+ newDestination.addConnection(this);
+ scheduler.registerEvent(newDestination);
+ } catch (final RuntimeException e) {
+ this.destination.set(previousDestination);
+ throw e;
+ }
+ }
+
+ @Override
+ public void lock() {
+ flowFileQueue.lock();
+ }
+
+ @Override
+ public void unlock() {
+ flowFileQueue.unlock();
+ }
+
+ @Override
+ public List<FlowFileRecord> poll(final FlowFileFilter filter, final Set<FlowFileRecord> expiredRecords) {
+ return flowFileQueue.poll(filter, expiredRecords);
+ }
+
+ @Override
+ public boolean equals(final Object other) {
+ if (!(other instanceof Connection)) {
+ return false;
+ }
+ final Connection con = (Connection) other;
+ return new EqualsBuilder().append(id, con.getIdentifier()).isEquals();
+ }
+
+ @Override
+ public int hashCode() {
+ return hashCode;
+ }
+
+ @Override
+ public String toString() {
+ return "Connection[ID=" + id + ",Name=" + name.get() + ",Source=" + getSource() + ",Destination=" + getDestination() + ",Relationships=" + getRelationships();
+ }
+
+ /**
+ * Gives this Connection ownership of the given FlowFile and allows the
+ * Connection to hold on to the FlowFile but NOT provide the FlowFile to
+ * consumers. This allows us to ensure that the Connection is not deleted
+ * during the middle of a Session commit.
+ *
+ * @param flowFile
+ */
+ @Override
+ public void enqueue(final FlowFileRecord flowFile) {
+ flowFileQueue.put(flowFile);
+ }
+
+ @Override
+ public void enqueue(final Collection<FlowFileRecord> flowFiles) {
+ flowFileQueue.putAll(flowFiles);
+ }
+
+ public static class Builder {
+
+ private final ProcessScheduler scheduler;
+
+ private String id = UUID.randomUUID().toString();
+ private String name;
+ private List<Position> bendPoints = new ArrayList<>();
+ private ProcessGroup processGroup;
+ private Connectable source;
+ private Connectable destination;
+ private Collection<Relationship> relationships;
+
+ public Builder(final ProcessScheduler scheduler) {
+ this.scheduler = scheduler;
+ }
+
+ public Builder id(final String id) {
+ this.id = id;
+ return this;
+ }
+
+ public Builder source(final Connectable source) {
+ this.source = source;
+ return this;
+ }
+
+ public Builder processGroup(final ProcessGroup group) {
+ this.processGroup = group;
+ return this;
+ }
+
+ public Builder destination(final Connectable destination) {
+ this.destination = destination;
+ return this;
+ }
+
+ public Builder relationships(final Collection<Relationship> relationships) {
+ this.relationships = new ArrayList<>(relationships);
+ return this;
+ }
+
+ public Builder name(final String name) {
+ this.name = name;
+ return this;
+ }
+
+ public Builder bendPoints(final List<Position> bendPoints) {
+ this.bendPoints.clear();
+ this.bendPoints.addAll(bendPoints);
+ return this;
+ }
+
+ public Builder addBendPoint(final Position bendPoint) {
+ bendPoints.add(bendPoint);
+ return this;
+ }
+
+ public StandardConnection build() {
+ if (source == null) {
+ throw new IllegalStateException("Cannot build a Connection without a Source");
+ }
+ if (destination == null) {
+ throw new IllegalStateException("Cannot build a Connection without a Destination");
+ }
+
+ if (relationships == null) {
+ relationships = new ArrayList<>();
+ }
+
+ if (relationships.isEmpty()) {
+ // ensure relationships have been specified for processors, otherwise the anonymous relationship is used
+ if (source.getConnectableType() == ConnectableType.PROCESSOR) {
+ throw new IllegalStateException("Cannot build a Connection without any relationships");
+ }
+ relationships.add(Relationship.ANONYMOUS);
+ }
+
+ return new StandardConnection(this);
+ }
+ }
+
+ @Override
+ public void verifyCanUpdate() {
+ // StandardConnection can always be updated
+ }
+
+ @Override
+ public void verifyCanDelete() {
+ if (!flowFileQueue.isEmpty()) {
+ throw new IllegalStateException("Queue not empty for " + this);
+ }
+
+ if (source.isRunning()) {
+ if (!ConnectableType.FUNNEL.equals(source.getConnectableType())) {
+ throw new IllegalStateException("Source of Connection (" + source + ") is running");
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/EventDrivenWorkerQueue.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/EventDrivenWorkerQueue.java b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/EventDrivenWorkerQueue.java
new file mode 100644
index 0000000..f36a459
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/EventDrivenWorkerQueue.java
@@ -0,0 +1,329 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.nifi.connectable.Connectable;
+import org.apache.nifi.connectable.Connection;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.util.Connectables;
+
+public class EventDrivenWorkerQueue implements WorkerQueue {
+
+ private final Object workMonitor = new Object();
+
+ private final Map<Connectable, Worker> workerMap = new HashMap<>(); // protected by synchronizing on workMonitor
+ private final WorkerReadyQueue workerQueue;
+
+ public EventDrivenWorkerQueue(final boolean clustered, final boolean primary, final ProcessScheduler scheduler) {
+ workerQueue = new WorkerReadyQueue(scheduler);
+ workerQueue.setClustered(clustered);
+ workerQueue.setPrimary(primary);
+ }
+
+ @Override
+ public void setClustered(final boolean clustered) {
+ workerQueue.setClustered(clustered);
+ }
+
+ @Override
+ public void setPrimary(final boolean primary) {
+ workerQueue.setPrimary(primary);
+ }
+
+ @Override
+ public Worker poll(final long timeout, final TimeUnit timeUnit) {
+ final long maxTime = System.currentTimeMillis() + TimeUnit.MILLISECONDS.convert(timeout, timeUnit);
+ while (System.currentTimeMillis() < maxTime) {
+ synchronized (workMonitor) {
+ final Worker worker = workerQueue.poll();
+ if (worker == null) {
+ // nothing to do. wait until we have something to do.
+ final long timeLeft = maxTime - System.currentTimeMillis();
+ if (timeLeft <= 0) {
+ return null;
+ }
+
+ try {
+ workMonitor.wait(timeLeft);
+ } catch (final InterruptedException ignored) {
+ }
+ } else {
+ // Decrement the amount of work there is to do for this worker.
+ final int workLeft = worker.decrementEventCount();
+ if (workLeft > 0) {
+ workerQueue.offer(worker);
+ }
+
+ return worker;
+ }
+ }
+ }
+
+ return null;
+ }
+
+ @Override
+ public void offer(final Connectable connectable) {
+ synchronized (workMonitor) {
+ Worker worker = workerMap.get(connectable);
+ if (worker == null) {
+ // if worker is null, then it has not been scheduled to run; ignore the event.
+ return;
+ }
+
+ final int countBefore = worker.incrementEventCount();
+ if (countBefore < 0) {
+ worker.setWorkCount(1);
+ }
+ if (countBefore <= 0) {
+ // If countBefore > 0 then it's already on the queue, so just incrementing its counter is sufficient.
+ workerQueue.offer(worker);
+ }
+
+ workMonitor.notify();
+ }
+ }
+
+ private int getWorkCount(final Connectable connectable) {
+ int sum = 0;
+ for (final Connection connection : connectable.getIncomingConnections()) {
+ sum += connection.getFlowFileQueue().size().getObjectCount();
+ }
+ return sum;
+ }
+
+ @Override
+ public void resumeWork(final Connectable connectable) {
+ synchronized (workMonitor) {
+ final int workCount = getWorkCount(connectable);
+ final Worker worker = new Worker(connectable);
+ workerMap.put(connectable, worker);
+
+ if (workCount > 0) {
+ worker.setWorkCount(workCount);
+ workerQueue.offer(worker);
+ workMonitor.notify();
+ }
+ }
+ }
+
+ @Override
+ public void suspendWork(final Connectable connectable) {
+ synchronized (workMonitor) {
+ final Worker worker = this.workerMap.remove(connectable);
+ if (worker == null) {
+ return;
+ }
+
+ worker.resetWorkCount();
+ workerQueue.remove(worker);
+ }
+ }
+
+ public static class Worker implements EventBasedWorker {
+
+ private final Connectable connectable;
+ private final AtomicInteger workCount = new AtomicInteger(0);
+
+ public Worker(final Connectable connectable) {
+ this.connectable = connectable;
+ }
+
+ @Override
+ public Connectable getConnectable() {
+ return connectable;
+ }
+
+ @Override
+ public int decrementEventCount() {
+ return workCount.decrementAndGet();
+ }
+
+ @Override
+ public int incrementEventCount() {
+ return workCount.getAndIncrement();
+ }
+
+ void resetWorkCount() {
+ workCount.set(0);
+ }
+
+ void setWorkCount(final int workCount) {
+ this.workCount.set(workCount);
+ }
+ }
+
+ @SuppressWarnings("serial")
+ private static class WorkerReadyQueue extends LinkedList<Worker> {
+
+ private final ProcessScheduler scheduler;
+
+ private volatile boolean clustered = false;
+ private volatile boolean primary = false;
+
+ public WorkerReadyQueue(final ProcessScheduler scheduler) {
+ this.scheduler = scheduler;
+ }
+
+ public void setClustered(final boolean clustered) {
+ this.clustered = clustered;
+ }
+
+ public void setPrimary(final boolean primary) {
+ this.primary = primary;
+ }
+
+ @Override
+ public Worker poll() {
+ final List<Worker> putBack = new ArrayList<>();
+
+ Worker worker;
+ try {
+ while ((worker = super.poll()) != null) {
+ final DelayProcessingReason reason = getDelayReason(worker);
+ if (reason == null) {
+ return worker;
+ } else {
+ // Worker is not ready. We may want to add him back to the queue, depending on the reason that he is unready.
+ switch (reason) {
+ case YIELDED:
+ case ISOLATED:
+ case DESTINATION_FULL:
+ case ALL_WORK_PENALIZED:
+ case NO_WORK:
+ case TOO_MANY_THREADS:
+ // there will not be an event that triggers this to happen, so we add this worker back to the queue.
+ putBack.add(worker);
+ break;
+ default:
+ case NOT_RUNNING:
+ // There's no need to check if this worker is available again until a another event
+ // occurs. Therefore, we keep him off of the queue and reset his work count
+ worker.resetWorkCount();
+ break;
+ }
+ }
+ }
+ } finally {
+ if (!putBack.isEmpty()) {
+ super.addAll(putBack);
+ }
+ }
+
+ return null;
+ }
+
+ private DelayProcessingReason getDelayReason(final Worker worker) {
+ final Connectable connectable = worker.getConnectable();
+
+ if (ScheduledState.RUNNING != connectable.getScheduledState()) {
+ return DelayProcessingReason.NOT_RUNNING;
+ }
+
+ if (connectable.getYieldExpiration() > System.currentTimeMillis()) {
+ return DelayProcessingReason.YIELDED;
+ }
+
+ // For Remote Output Ports,
+ int availableRelationshipCount = 0;
+ if (!connectable.getRelationships().isEmpty()) {
+ availableRelationshipCount = getAvailableRelationshipCount(connectable);
+
+ if (availableRelationshipCount == 0) {
+ return DelayProcessingReason.DESTINATION_FULL;
+ }
+ }
+
+ if (connectable.hasIncomingConnection() && !Connectables.flowFilesQueued(connectable)) {
+ return DelayProcessingReason.NO_WORK;
+ }
+
+ final int activeThreadCount = scheduler.getActiveThreadCount(worker.getConnectable());
+ final int maxThreadCount = worker.getConnectable().getMaxConcurrentTasks();
+ if (maxThreadCount > 0 && activeThreadCount >= maxThreadCount) {
+ return DelayProcessingReason.TOO_MANY_THREADS;
+ }
+
+ if (connectable instanceof ProcessorNode) {
+ final ProcessorNode procNode = (ProcessorNode) connectable;
+ if (procNode.isIsolated() && clustered && !primary) {
+ return DelayProcessingReason.ISOLATED;
+ }
+
+ final boolean triggerWhenAnyAvailable = procNode.isTriggerWhenAnyDestinationAvailable();
+ final boolean allDestinationsAvailable = availableRelationshipCount == procNode.getRelationships().size();
+ if (!triggerWhenAnyAvailable && !allDestinationsAvailable) {
+ return DelayProcessingReason.DESTINATION_FULL;
+ }
+ }
+
+ return null;
+ }
+
+ private int getAvailableRelationshipCount(final Connectable connectable) {
+ int count = 0;
+ for (final Relationship relationship : connectable.getRelationships()) {
+ final Collection<Connection> connections = connectable.getConnections(relationship);
+
+ if (connections == null || connections.isEmpty()) {
+ if (connectable.isAutoTerminated(relationship)) {
+ // If the relationship is auto-terminated, consider it available.
+ count++;
+ }
+ } else {
+ boolean available = true;
+ for (final Connection connection : connections) {
+ if (connection.getSource() == connection.getDestination()) {
+ // don't count self-loops
+ continue;
+ }
+
+ if (connection.getFlowFileQueue().isFull()) {
+ available = false;
+ }
+ }
+
+ if (available) {
+ count++;
+ }
+ }
+ }
+
+ return count;
+ }
+ }
+
+ private static enum DelayProcessingReason {
+
+ YIELDED,
+ DESTINATION_FULL,
+ NO_WORK,
+ ALL_WORK_PENALIZED,
+ ISOLATED,
+ NOT_RUNNING,
+ TOO_MANY_THREADS;
+ }
+}