You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2014/12/08 21:29:29 UTC

[06/51] [partial] incubator-nifi git commit: Initial code contribution

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/TransferDirection.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/TransferDirection.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/TransferDirection.java
new file mode 100644
index 0000000..56432d5
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/TransferDirection.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote;
+
+public enum TransferDirection {
+
+    SEND,
+    RECEIVE;
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/VersionedRemoteResource.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/VersionedRemoteResource.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/VersionedRemoteResource.java
new file mode 100644
index 0000000..bfccd98
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/VersionedRemoteResource.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote;
+
+public interface VersionedRemoteResource {
+
+    VersionNegotiator getVersionNegotiator();
+
+    String getResourceName();
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/codec/FlowFileCodec.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/codec/FlowFileCodec.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/codec/FlowFileCodec.java
new file mode 100644
index 0000000..b4206b3
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/codec/FlowFileCodec.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.codec;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.List;
+
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.remote.VersionedRemoteResource;
+import org.apache.nifi.remote.exception.ProtocolException;
+import org.apache.nifi.remote.exception.TransmissionDisabledException;
+
+/**
+ * <p>
+ * Provides a mechanism for encoding and decoding FlowFiles as streams so that
+ * they can be transferred remotely.
+ * </p>
+ */
+public interface FlowFileCodec extends VersionedRemoteResource {
+
+    /**
+     * Returns a List of all versions that this codec is able to support, in the
+     * order that they are preferred by the codec
+     *
+     * @return
+     */
+    public List<Integer> getSupportedVersions();
+
+    /**
+     * Encodes a FlowFile and its content as a single stream of data and writes
+     * that stream to the output. If checksum is not null, it will be calculated
+     * as the stream is read
+     *
+     * @param flowFile the FlowFile to encode
+     * @param session a session that can be used to transactionally create and
+     * transfer flow files
+     * @param outStream the stream to write the data to
+     *
+     * @return the updated FlowFile
+     *
+     * @throws IOException
+     */
+    FlowFile encode(FlowFile flowFile, ProcessSession session, OutputStream outStream) throws IOException, TransmissionDisabledException;
+
+    /**
+     * Decodes the contents of the InputStream, interpreting the data to
+     * determine the next FlowFile's attributes and content, as well as their
+     * destinations. If not null, checksum will be used to calculate the
+     * checksum as the data is read.
+     *
+     * @param stream an InputStream containing FlowFiles' contents, attributes,
+     * and destinations
+     * @param session
+     *
+     * @return the FlowFile that was created, or <code>null</code> if the stream
+     * was out of data
+     *
+     * @throws IOException
+     * @throws ProtocolException if the input is malformed
+     */
+    FlowFile decode(InputStream stream, ProcessSession session) throws IOException, ProtocolException, TransmissionDisabledException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/BadRequestException.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/BadRequestException.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/BadRequestException.java
new file mode 100644
index 0000000..f6c2f4f
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/BadRequestException.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.exception;
+
+public class BadRequestException extends Exception {
+
+    private static final long serialVersionUID = -8034602852256106560L;
+
+    public BadRequestException(final String message) {
+        super(message);
+    }
+
+    public BadRequestException(final Throwable cause) {
+        super(cause);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/HandshakeException.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/HandshakeException.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/HandshakeException.java
new file mode 100644
index 0000000..b61fc65
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/HandshakeException.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.exception;
+
+public class HandshakeException extends Exception {
+
+    private static final long serialVersionUID = 178192341908726L;
+
+    public HandshakeException(final String message) {
+        super(message);
+    }
+
+    public HandshakeException(final Throwable cause) {
+        super(cause);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/NotAuthorizedException.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/NotAuthorizedException.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/NotAuthorizedException.java
new file mode 100644
index 0000000..24ff3a5
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/NotAuthorizedException.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.exception;
+
+public class NotAuthorizedException extends Exception {
+
+    private static final long serialVersionUID = 2952623568114035498L;
+
+    public NotAuthorizedException(final String message) {
+        super(message);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/PortNotRunningException.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/PortNotRunningException.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/PortNotRunningException.java
new file mode 100644
index 0000000..af0f467
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/PortNotRunningException.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.exception;
+
+public class PortNotRunningException extends Exception {
+
+    private static final long serialVersionUID = -2790940982005516375L;
+
+    public PortNotRunningException(final String message) {
+        super(message);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/ProtocolException.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/ProtocolException.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/ProtocolException.java
new file mode 100644
index 0000000..0f50b98
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/ProtocolException.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.exception;
+
+public class ProtocolException extends Exception {
+
+    private static final long serialVersionUID = 5763900324505818495L;
+
+    public ProtocolException(final String message, final Throwable cause) {
+        super(message, cause);
+    }
+
+    public ProtocolException(final String message) {
+        super(message);
+    }
+
+    public ProtocolException(final Throwable cause) {
+        super(cause);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/RequestExpiredException.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/RequestExpiredException.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/RequestExpiredException.java
new file mode 100644
index 0000000..dd675b3
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/RequestExpiredException.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.exception;
+
+/**
+ * Used to indicate that by the time the request was serviced, it had already
+ * expired
+ */
+public class RequestExpiredException extends Exception {
+
+    private static final long serialVersionUID = -7037025330562827852L;
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/UnknownPortException.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/UnknownPortException.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/UnknownPortException.java
new file mode 100644
index 0000000..e6a0fe7
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/UnknownPortException.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.exception;
+
+public class UnknownPortException extends Exception {
+
+    private static final long serialVersionUID = -2790940982005516375L;
+
+    public UnknownPortException(final String message) {
+        super(message);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/protocol/ClientProtocol.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/protocol/ClientProtocol.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/protocol/ClientProtocol.java
new file mode 100644
index 0000000..32274eb
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/protocol/ClientProtocol.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.protocol;
+
+import java.io.IOException;
+import java.util.Set;
+
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.remote.Peer;
+import org.apache.nifi.remote.PeerStatus;
+import org.apache.nifi.remote.VersionedRemoteResource;
+import org.apache.nifi.remote.codec.FlowFileCodec;
+import org.apache.nifi.remote.exception.HandshakeException;
+import org.apache.nifi.remote.exception.PortNotRunningException;
+import org.apache.nifi.remote.exception.ProtocolException;
+import org.apache.nifi.remote.exception.UnknownPortException;
+
+public interface ClientProtocol extends VersionedRemoteResource {
+
+    void handshake(Peer peer) throws IOException, HandshakeException, UnknownPortException, PortNotRunningException;
+
+    Set<PeerStatus> getPeerStatuses(Peer currentPeer) throws IOException, ProtocolException;
+
+    FlowFileCodec negotiateCodec(Peer peer) throws IOException, ProtocolException;
+
+    void receiveFlowFiles(Peer peer, ProcessContext context, ProcessSession session, FlowFileCodec codec) throws IOException, ProtocolException;
+
+    void transferFlowFiles(Peer peer, ProcessContext context, ProcessSession session, FlowFileCodec codec) throws IOException, ProtocolException;
+
+    void shutdown(Peer peer) throws IOException, ProtocolException;
+
+    boolean isReadyForFileTransfer();
+
+    /**
+     * returns <code>true</code> if remote instance indicates that the port is
+     * invalid
+     *
+     * @return
+     * @throws IllegalStateException if a handshake has not successfully
+     * completed
+     */
+    boolean isPortInvalid() throws IllegalStateException;
+
+    /**
+     * returns <code>true</code> if remote instance indicates that the port is
+     * unknown
+     *
+     * @return
+     * @throws IllegalStateException if a handshake has not successfully
+     * completed
+     */
+    boolean isPortUnknown();
+
+    /**
+     * returns <code>true</code> if remote instance indicates that the port's
+     * destination is full
+     *
+     * @return
+     * @throws IllegalStateException if a handshake has not successfully
+     * completed
+     */
+    boolean isDestinationFull();
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/protocol/CommunicationsInput.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/protocol/CommunicationsInput.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/protocol/CommunicationsInput.java
new file mode 100644
index 0000000..d2e2946
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/protocol/CommunicationsInput.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.protocol;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+public interface CommunicationsInput {
+
+    InputStream getInputStream() throws IOException;
+
+    long getBytesRead();
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/protocol/CommunicationsOutput.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/protocol/CommunicationsOutput.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/protocol/CommunicationsOutput.java
new file mode 100644
index 0000000..95cab29
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/protocol/CommunicationsOutput.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.protocol;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+public interface CommunicationsOutput {
+
+    OutputStream getOutputStream() throws IOException;
+
+    long getBytesWritten();
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/protocol/CommunicationsSession.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/protocol/CommunicationsSession.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/protocol/CommunicationsSession.java
new file mode 100644
index 0000000..d009cec
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/protocol/CommunicationsSession.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.protocol;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+public interface CommunicationsSession extends Closeable {
+
+    public static final byte[] MAGIC_BYTES = {(byte) 'N', (byte) 'i', (byte) 'F', (byte) 'i'};
+
+    CommunicationsInput getInput();
+
+    CommunicationsOutput getOutput();
+
+    void setTimeout(int millis) throws IOException;
+
+    int getTimeout() throws IOException;
+
+    void setUri(String uri);
+
+    String getUri();
+
+    String getUserDn();
+
+    void setUserDn(String dn);
+
+    boolean isDataAvailable();
+
+    long getBytesWritten();
+
+    long getBytesRead();
+
+    /**
+     * Asynchronously interrupts this FlowFileCodec. Implementations must ensure
+     * that they stop sending and receiving data as soon as possible after this
+     * method has been called, even if doing so results in sending only partial
+     * data to the peer. This will usually result in the peer throwing a
+     * SocketTimeoutException.
+     */
+    void interrupt();
+
+    /**
+     * Returns <code>true</code> if the connection is closed, <code>false</code>
+     * otherwise.
+     *
+     * @return
+     */
+    boolean isClosed();
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/protocol/RequestType.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/protocol/RequestType.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/protocol/RequestType.java
new file mode 100644
index 0000000..41334fe
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/protocol/RequestType.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.protocol;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+public enum RequestType {
+
+    NEGOTIATE_FLOWFILE_CODEC,
+    REQUEST_PEER_LIST,
+    SEND_FLOWFILES,
+    RECEIVE_FLOWFILES,
+    SHUTDOWN;
+
+    public void writeRequestType(final DataOutputStream dos) throws IOException {
+        dos.writeUTF(name());
+    }
+
+    public static RequestType readRequestType(final DataInputStream dis) throws IOException {
+        final String requestTypeVal = dis.readUTF();
+        try {
+            return RequestType.valueOf(requestTypeVal);
+        } catch (final Exception e) {
+            throw new IOException("Could not determine RequestType: received invalid value " + requestTypeVal);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/protocol/ServerProtocol.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/protocol/ServerProtocol.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/protocol/ServerProtocol.java
new file mode 100644
index 0000000..0d18f2e
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/protocol/ServerProtocol.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.protocol;
+
+import java.io.IOException;
+
+import org.apache.nifi.cluster.NodeInformant;
+import org.apache.nifi.groups.ProcessGroup;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.remote.Peer;
+import org.apache.nifi.remote.RootGroupPort;
+import org.apache.nifi.remote.VersionedRemoteResource;
+import org.apache.nifi.remote.codec.FlowFileCodec;
+import org.apache.nifi.remote.exception.BadRequestException;
+import org.apache.nifi.remote.exception.HandshakeException;
+import org.apache.nifi.remote.exception.ProtocolException;
+
+public interface ServerProtocol extends VersionedRemoteResource {
+
+    /**
+     *
+     * @param rootGroup
+     */
+    void setRootProcessGroup(ProcessGroup rootGroup);
+
+    RootGroupPort getPort();
+
+    /**
+     * Optional operation. Sets the NodeInformant to use in this Protocol, if a
+     * NodeInformant is supported. Otherwise, throws
+     * UnsupportedOperationException
+     *
+     * @param nodeInformant
+     */
+    void setNodeInformant(NodeInformant nodeInformant);
+
+    /**
+     * Receives the handshake from the Peer
+     *
+     * @param peer
+     * @throws IOException
+     * @throws HandshakeException
+     */
+    void handshake(Peer peer) throws IOException, HandshakeException;
+
+    /**
+     * Returns <code>true</code> if the handshaking process was completed
+     * successfully, <code>false</code> if either the handshaking process has
+     * not happened or the handshake failed
+     *
+     * @return
+     */
+    boolean isHandshakeSuccessful();
+
+    /**
+     * Negotiates the FlowFileCodec that is to be used for transferring
+     * FlowFiles
+     *
+     * @param peer
+     * @return
+     * @throws IOException
+     * @throws BadRequestException
+     */
+    FlowFileCodec negotiateCodec(Peer peer) throws IOException, ProtocolException;
+
+    /**
+     * Returns the codec that has already been negotiated by this Protocol, if
+     * any.
+     *
+     * @return
+     */
+    FlowFileCodec getPreNegotiatedCodec();
+
+    /**
+     * Reads the Request Type of the next request from the Peer
+     *
+     * @return the RequestType that the peer would like to happen - or null, if
+     * no data available
+     */
+    RequestType getRequestType(Peer peer) throws IOException;
+
+    /**
+     * Sends FlowFiles to the specified peer
+     *
+     * @param peer
+     * @param context
+     * @param session
+     * @param codec
+     *
+     * @return the number of FlowFiles transferred
+     */
+    int transferFlowFiles(Peer peer, ProcessContext context, ProcessSession session, FlowFileCodec codec) throws IOException, ProtocolException;
+
+    /**
+     * Receives FlowFiles from the specified peer
+     *
+     * @param peer
+     * @param context
+     * @param session
+     * @param codec
+     * @throws IOException
+     *
+     * @return the number of FlowFiles received
+     * @throws ProtocolException
+     */
+    int receiveFlowFiles(Peer peer, ProcessContext context, ProcessSession session, FlowFileCodec codec) throws IOException, ProtocolException;
+
+    /**
+     * Returns the number of milliseconds after a request is received for which
+     * the request is still valid. A valid of 0 indicates that the request will
+     * not expire.
+     *
+     * @return
+     */
+    long getRequestExpiration();
+
+    /**
+     * Sends a list of all nodes in the cluster to the specified peer. If not in
+     * a cluster, sends info about itself
+     *
+     * @param peer
+     */
+    void sendPeerList(Peer peer) throws IOException;
+
+    void shutdown(Peer peer);
+
+    boolean isShutdown();
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core/.gitignore
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/core/.gitignore b/nar-bundles/framework-bundle/framework/core/.gitignore
new file mode 100755
index 0000000..ea8c4bf
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/core/.gitignore
@@ -0,0 +1 @@
+/target

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core/pom.xml
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/core/pom.xml b/nar-bundles/framework-bundle/framework/core/pom.xml
new file mode 100644
index 0000000..547c75d
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/core/pom.xml
@@ -0,0 +1,130 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-framework-parent</artifactId>
+        <version>0.0.1-SNAPSHOT</version>
+    </parent>
+    <artifactId>framework-core</artifactId>
+    <packaging>jar</packaging>
+    <name>NiFi Framework Core</name>
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>core-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-expression-language</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-utils</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-file-utils</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-properties</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>site-to-site</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-core-flowfile-attributes</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>ch.qos.logback</groupId>
+            <artifactId>logback-classic</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>framework-cluster-protocol</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-logging-utils</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>client-dto</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.quartz-scheduler</groupId>
+            <artifactId>quartz</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.h2database</groupId>
+            <artifactId>h2</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.jasypt</groupId>
+            <artifactId>jasypt</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.bouncycastle</groupId>
+            <artifactId>bcprov-jdk16</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.sun.jersey</groupId>
+            <artifactId>jersey-client</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-security</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.springframework</groupId>
+            <artifactId>spring-core</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.springframework</groupId>
+            <artifactId>spring-beans</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.springframework</groupId>
+            <artifactId>spring-context</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>commons-io</groupId>
+            <artifactId>commons-io</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>data-provenance-utils</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>wali</groupId>
+            <artifactId>wali</artifactId>
+            <version>3.0.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-mock</artifactId>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/cluster/BulletinsPayload.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/cluster/BulletinsPayload.java b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/cluster/BulletinsPayload.java
new file mode 100644
index 0000000..1249657
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/cluster/BulletinsPayload.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Set;
+import javax.xml.bind.JAXBContext;
+import javax.xml.bind.JAXBException;
+import javax.xml.bind.Marshaller;
+import javax.xml.bind.Unmarshaller;
+import javax.xml.bind.annotation.XmlRootElement;
+import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter;
+import org.apache.nifi.cluster.protocol.ProtocolException;
+import org.apache.nifi.jaxb.BulletinAdapter;
+import org.apache.nifi.reporting.Bulletin;
+
+/**
+ * The payload of the bulletins.
+ *
+ * @author unattributed
+ */
+@XmlRootElement
+public class BulletinsPayload {
+
+    private static final JAXBContext JAXB_CONTEXT;
+
+    static {
+        try {
+            JAXB_CONTEXT = JAXBContext.newInstance(BulletinsPayload.class);
+        } catch (JAXBException e) {
+            throw new RuntimeException("Unable to create JAXBContext.");
+        }
+    }
+
+    private Set<Bulletin> bulletins;
+
+    @XmlJavaTypeAdapter(BulletinAdapter.class)
+    public Set<Bulletin> getBulletins() {
+        return bulletins;
+    }
+
+    public void setBulletins(final Set<Bulletin> bulletins) {
+        this.bulletins = bulletins;
+    }
+
+    public byte[] marshal() throws ProtocolException {
+        final ByteArrayOutputStream payloadBytes = new ByteArrayOutputStream();
+        marshal(this, payloadBytes);
+        return payloadBytes.toByteArray();
+    }
+
+    public static void marshal(final BulletinsPayload payload, final OutputStream os) throws ProtocolException {
+        try {
+            final Marshaller marshaller = JAXB_CONTEXT.createMarshaller();
+            marshaller.marshal(payload, os);
+        } catch (final JAXBException je) {
+            throw new ProtocolException(je);
+        }
+    }
+
+    public static BulletinsPayload unmarshal(final InputStream is) throws ProtocolException {
+        try {
+            final Unmarshaller unmarshaller = JAXB_CONTEXT.createUnmarshaller();
+            return (BulletinsPayload) unmarshaller.unmarshal(is);
+        } catch (final JAXBException je) {
+            throw new ProtocolException(je);
+        }
+    }
+
+    public static BulletinsPayload unmarshal(final byte[] bytes) throws ProtocolException {
+        try {
+            final Unmarshaller unmarshaller = JAXB_CONTEXT.createUnmarshaller();
+            return (BulletinsPayload) unmarshaller.unmarshal(new ByteArrayInputStream(bytes));
+        } catch (final JAXBException je) {
+            throw new ProtocolException(je);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/cluster/ConnectionException.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/cluster/ConnectionException.java b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/cluster/ConnectionException.java
new file mode 100644
index 0000000..986e904
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/cluster/ConnectionException.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster;
+
+/**
+ * Represents the exceptional case when connection to the cluster fails.
+ *
+ * @author unattributed
+ */
+public class ConnectionException extends RuntimeException {
+
+    private static final long serialVersionUID = -1378294897231234028L;
+
+    public ConnectionException() {
+    }
+
+    public ConnectionException(String msg) {
+        super(msg);
+    }
+
+    public ConnectionException(Throwable cause) {
+        super(cause);
+    }
+
+    public ConnectionException(String msg, Throwable cause) {
+        super(msg, cause);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/cluster/DisconnectionException.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/cluster/DisconnectionException.java b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/cluster/DisconnectionException.java
new file mode 100644
index 0000000..55707f3
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/cluster/DisconnectionException.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster;
+
+/**
+ * Represents the exceptional case when disconnection from the cluster fails.
+ *
+ * @author unattributed
+ */
+public class DisconnectionException extends RuntimeException {
+
+    private static final long serialVersionUID = 6648876367997026125L;
+
+    public DisconnectionException() {
+    }
+
+    public DisconnectionException(String msg) {
+        super(msg);
+    }
+
+    public DisconnectionException(Throwable cause) {
+        super(cause);
+    }
+
+    public DisconnectionException(String msg, Throwable cause) {
+        super(msg, cause);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/cluster/HeartbeatPayload.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/cluster/HeartbeatPayload.java b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/cluster/HeartbeatPayload.java
new file mode 100644
index 0000000..093b238
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/cluster/HeartbeatPayload.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.List;
+import javax.xml.bind.JAXBContext;
+import javax.xml.bind.JAXBException;
+import javax.xml.bind.Marshaller;
+import javax.xml.bind.Unmarshaller;
+import javax.xml.bind.annotation.XmlRootElement;
+import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter;
+import org.apache.nifi.cluster.protocol.ProtocolException;
+import org.apache.nifi.controller.Counter;
+import org.apache.nifi.controller.status.ProcessGroupStatus;
+import org.apache.nifi.diagnostics.SystemDiagnostics;
+import org.apache.nifi.jaxb.CounterAdapter;
+
+/**
+ * The payload of the heartbeat. The payload contains status to inform the
+ * cluster manager the current workload of this node.
+ *
+ * @author unattributed
+ */
+@XmlRootElement
+public class HeartbeatPayload {
+
+    private static final JAXBContext JAXB_CONTEXT;
+
+    static {
+        try {
+            JAXB_CONTEXT = JAXBContext.newInstance(HeartbeatPayload.class);
+        } catch (JAXBException e) {
+            throw new RuntimeException("Unable to create JAXBContext.");
+        }
+    }
+
+    private List<Counter> counters;
+    private ProcessGroupStatus processGroupStatus;
+    private int activeThreadCount;
+    private long totalFlowFileCount;
+    private long totalFlowFileBytes;
+    private SystemDiagnostics systemDiagnostics;
+    private Integer siteToSitePort;
+    private boolean siteToSiteSecure;
+    private long systemStartTime;
+
+    @XmlJavaTypeAdapter(CounterAdapter.class)
+    public List<Counter> getCounters() {
+        return counters;
+    }
+
+    public void setCounters(final List<Counter> counters) {
+        this.counters = counters;
+    }
+
+    public int getActiveThreadCount() {
+        return activeThreadCount;
+    }
+
+    public void setActiveThreadCount(final int activeThreadCount) {
+        this.activeThreadCount = activeThreadCount;
+    }
+
+    public long getTotalFlowFileCount() {
+        return totalFlowFileCount;
+    }
+
+    public void setTotalFlowFileCount(final long totalFlowFileCount) {
+        this.totalFlowFileCount = totalFlowFileCount;
+    }
+
+    public long getTotalFlowFileBytes() {
+        return totalFlowFileBytes;
+    }
+
+    public void setTotalFlowFileBytes(final long totalFlowFileBytes) {
+        this.totalFlowFileBytes = totalFlowFileBytes;
+    }
+
+    public ProcessGroupStatus getProcessGroupStatus() {
+        return processGroupStatus;
+    }
+
+    public void setProcessGroupStatus(final ProcessGroupStatus processGroupStatus) {
+        this.processGroupStatus = processGroupStatus;
+    }
+
+    public SystemDiagnostics getSystemDiagnostics() {
+        return systemDiagnostics;
+    }
+
+    public void setSystemDiagnostics(final SystemDiagnostics systemDiagnostics) {
+        this.systemDiagnostics = systemDiagnostics;
+    }
+
+    public boolean isSiteToSiteSecure() {
+        return siteToSiteSecure;
+    }
+
+    public void setSiteToSiteSecure(final boolean secure) {
+        this.siteToSiteSecure = secure;
+    }
+
+    public Integer getSiteToSitePort() {
+        return siteToSitePort;
+    }
+
+    public void setSiteToSitePort(final Integer port) {
+        this.siteToSitePort = port;
+    }
+
+    public long getSystemStartTime() {
+        return systemStartTime;
+    }
+
+    public void setSystemStartTime(final long systemStartTime) {
+        this.systemStartTime = systemStartTime;
+    }
+
+    public byte[] marshal() throws ProtocolException {
+        final ByteArrayOutputStream payloadBytes = new ByteArrayOutputStream();
+        marshal(this, payloadBytes);
+        return payloadBytes.toByteArray();
+    }
+
+    public static void marshal(final HeartbeatPayload payload, final OutputStream os) throws ProtocolException {
+        try {
+            final Marshaller marshaller = JAXB_CONTEXT.createMarshaller();
+            marshaller.marshal(payload, os);
+        } catch (final JAXBException je) {
+            throw new ProtocolException(je);
+        }
+    }
+
+    public static HeartbeatPayload unmarshal(final InputStream is) throws ProtocolException {
+        try {
+            final Unmarshaller unmarshaller = JAXB_CONTEXT.createUnmarshaller();
+            return (HeartbeatPayload) unmarshaller.unmarshal(is);
+        } catch (final JAXBException je) {
+            throw new ProtocolException(je);
+        }
+    }
+
+    public static HeartbeatPayload unmarshal(final byte[] bytes) throws ProtocolException {
+        try {
+            final Unmarshaller unmarshaller = JAXB_CONTEXT.createUnmarshaller();
+            return (HeartbeatPayload) unmarshaller.unmarshal(new ByteArrayInputStream(bytes));
+        } catch (final JAXBException je) {
+            throw new ProtocolException(je);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/connectable/LocalPort.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/connectable/LocalPort.java b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/connectable/LocalPort.java
new file mode 100644
index 0000000..1efa0cd
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/connectable/LocalPort.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.connectable;
+
+import org.apache.nifi.connectable.ConnectableType;
+import org.apache.nifi.connectable.Connection;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.controller.AbstractPort;
+import org.apache.nifi.controller.ProcessScheduler;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.groups.ProcessGroup;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.scheduling.SchedulingStrategy;
+
+/**
+ * Provides a mechanism by which <code>FlowFile</code>s can be transferred into
+ * and out of a <code>ProcessGroup</code> to and/or from another
+ * <code>ProcessGroup</code> within the same instance of NiFi.
+ */
+public class LocalPort extends AbstractPort {
+
+    private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
+    private final Lock readLock = rwLock.readLock();
+    private final Lock writeLock = rwLock.writeLock();
+
+    public LocalPort(final String id, final String name, final ProcessGroup processGroup, final ConnectableType type, final ProcessScheduler scheduler) {
+        super(id, name, processGroup, type, scheduler);
+    }
+
+    @Override
+    public boolean isValid() {
+        return !getConnections(Relationship.ANONYMOUS).isEmpty();
+    }
+
+    @Override
+    public Collection<ValidationResult> getValidationErrors() {
+        final Collection<ValidationResult> validationErrors = new ArrayList<>();
+        if (!isValid()) {
+            final ValidationResult error = new ValidationResult.Builder()
+                    .explanation(String.format("Output connection for port '%s' is not defined.", getName()))
+                    .subject(String.format("Port '%s'", getName()))
+                    .valid(false)
+                    .build();
+            validationErrors.add(error);
+        }
+        return validationErrors;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) {
+        readLock.lock();
+        try {
+            final List<FlowFile> flowFiles = session.get(10);
+            if (flowFiles.isEmpty()) {
+                context.yield();
+            } else {
+                session.transfer(flowFiles, Relationship.ANONYMOUS);
+            }
+        } finally {
+            readLock.unlock();
+        }
+    }
+
+    @Override
+    public void updateConnection(final Connection connection) throws IllegalStateException {
+        writeLock.lock();
+        try {
+            super.updateConnection(connection);
+        } finally {
+            writeLock.unlock();
+        }
+    }
+
+    @Override
+    public void addConnection(final Connection connection) throws IllegalArgumentException {
+        writeLock.lock();
+        try {
+            super.addConnection(connection);
+        } finally {
+            writeLock.unlock();
+        }
+    }
+
+    @Override
+    public void removeConnection(final Connection connection) throws IllegalArgumentException, IllegalStateException {
+        writeLock.lock();
+        try {
+            super.removeConnection(connection);
+        } finally {
+            writeLock.unlock();
+        }
+    }
+
+    @Override
+    public Set<Connection> getConnections() {
+        readLock.lock();
+        try {
+            return super.getConnections();
+        } finally {
+            readLock.unlock();
+        }
+    }
+
+    @Override
+    public Set<Connection> getConnections(Relationship relationship) {
+        readLock.lock();
+        try {
+            return super.getConnections(relationship);
+        } finally {
+            readLock.unlock();
+        }
+    }
+
+    @Override
+    public List<Connection> getIncomingConnections() {
+        readLock.lock();
+        try {
+            return super.getIncomingConnections();
+        } finally {
+            readLock.unlock();
+        }
+    }
+
+    @Override
+    public boolean hasIncomingConnection() {
+        readLock.lock();
+        try {
+            return super.hasIncomingConnection();
+        } finally {
+            readLock.unlock();
+        }
+    }
+
+    @Override
+    public boolean isTriggerWhenEmpty() {
+        return false;
+    }
+
+    @Override
+    public SchedulingStrategy getSchedulingStrategy() {
+        return SchedulingStrategy.EVENT_DRIVEN;
+    }
+
+    @Override
+    public boolean isSideEffectFree() {
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/connectable/StandardConnection.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/connectable/StandardConnection.java b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/connectable/StandardConnection.java
new file mode 100644
index 0000000..1d723b5
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/connectable/StandardConnection.java
@@ -0,0 +1,336 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.connectable;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.nifi.controller.FlowFileQueue;
+import org.apache.nifi.controller.ProcessScheduler;
+import org.apache.nifi.controller.StandardFlowFileQueue;
+import org.apache.nifi.controller.repository.FlowFileRecord;
+import org.apache.nifi.groups.ProcessGroup;
+import org.apache.nifi.processor.FlowFileFilter;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.util.NiFiProperties;
+
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+
+/**
+ * Models a connection between connectable components. A connection may contain
+ * one or more relationships that map the source component to the destination
+ * component.
+ */
+public final class StandardConnection implements Connection {
+
+    private final String id;
+    private final AtomicReference<ProcessGroup> processGroup;
+    private final AtomicReference<String> name;
+    private final AtomicReference<List<Position>> bendPoints;
+    private final Connectable source;
+    private final AtomicReference<Connectable> destination;
+    private final AtomicReference<Collection<Relationship>> relationships;
+    private final StandardFlowFileQueue flowFileQueue;
+    private final AtomicInteger labelIndex = new AtomicInteger(1);
+    private final AtomicLong zIndex = new AtomicLong(0L);
+    private final ProcessScheduler scheduler;
+    private final int hashCode;
+
+    private StandardConnection(final Builder builder) {
+        id = builder.id;
+        name = new AtomicReference<>(builder.name);
+        bendPoints = new AtomicReference<>(Collections.unmodifiableList(new ArrayList<>(builder.bendPoints)));
+        processGroup = new AtomicReference<>(builder.processGroup);
+        source = builder.source;
+        destination = new AtomicReference<>(builder.destination);
+        relationships = new AtomicReference<>(Collections.unmodifiableCollection(builder.relationships));
+        scheduler = builder.scheduler;
+        flowFileQueue = new StandardFlowFileQueue(id, this, scheduler, NiFiProperties.getInstance().getQueueSwapThreshold());
+        hashCode = new HashCodeBuilder(7, 67).append(id).toHashCode();
+    }
+
+    public ProcessGroup getProcessGroup() {
+        return processGroup.get();
+    }
+
+    public String getIdentifier() {
+        return id;
+    }
+
+    public String getName() {
+        return name.get();
+    }
+
+    public void setName(final String name) {
+        this.name.set(name);
+    }
+
+    @Override
+    public List<Position> getBendPoints() {
+        return bendPoints.get();
+    }
+
+    @Override
+    public void setBendPoints(final List<Position> position) {
+        this.bendPoints.set(Collections.unmodifiableList(new ArrayList<>(position)));
+    }
+
+    public int getLabelIndex() {
+        return labelIndex.get();
+    }
+
+    public void setLabelIndex(final int labelIndex) {
+        this.labelIndex.set(labelIndex);
+    }
+
+    @Override
+    public long getZIndex() {
+        return zIndex.get();
+    }
+
+    @Override
+    public void setZIndex(final long zIndex) {
+        this.zIndex.set(zIndex);
+    }
+
+    public Connectable getSource() {
+        return source;
+    }
+
+    public Connectable getDestination() {
+        return destination.get();
+    }
+
+    public Collection<Relationship> getRelationships() {
+        return relationships.get();
+    }
+
+    public FlowFileQueue getFlowFileQueue() {
+        return flowFileQueue;
+    }
+
+    public void setProcessGroup(final ProcessGroup newGroup) {
+        final ProcessGroup currentGroup = this.processGroup.get();
+        try {
+            this.processGroup.set(newGroup);
+        } catch (final RuntimeException e) {
+            this.processGroup.set(currentGroup);
+            throw e;
+        }
+    }
+
+    public void setRelationships(final Collection<Relationship> newRelationships) {
+        final Collection<Relationship> currentRelationships = relationships.get();
+        if (currentRelationships.equals(newRelationships)) {
+            return;
+        }
+
+        if (getSource().isRunning()) {
+            throw new IllegalStateException("Cannot update the relationships for Connection because the source of the Connection is running");
+        }
+
+        try {
+            this.relationships.set(new ArrayList<>(newRelationships));
+            getSource().updateConnection(this);
+        } catch (final RuntimeException e) {
+            this.relationships.set(currentRelationships);
+            throw e;
+        }
+    }
+
+    public void setDestination(final Connectable newDestination) {
+        final Connectable previousDestination = destination.get();
+        if (previousDestination.equals(newDestination)) {
+            return;
+        }
+
+        if (previousDestination.isRunning() && !(previousDestination instanceof Funnel || previousDestination instanceof LocalPort)) {
+            throw new IllegalStateException("Cannot change destination of Connection because the current destination is running");
+        }
+
+        try {
+            previousDestination.removeConnection(this);
+            this.destination.set(newDestination);
+            getSource().updateConnection(this);
+
+            newDestination.addConnection(this);
+            scheduler.registerEvent(newDestination);
+        } catch (final RuntimeException e) {
+            this.destination.set(previousDestination);
+            throw e;
+        }
+    }
+
+    @Override
+    public void lock() {
+        flowFileQueue.lock();
+    }
+
+    @Override
+    public void unlock() {
+        flowFileQueue.unlock();
+    }
+
+    @Override
+    public List<FlowFileRecord> poll(final FlowFileFilter filter, final Set<FlowFileRecord> expiredRecords) {
+        return flowFileQueue.poll(filter, expiredRecords);
+    }
+
+    @Override
+    public boolean equals(final Object other) {
+        if (!(other instanceof Connection)) {
+            return false;
+        }
+        final Connection con = (Connection) other;
+        return new EqualsBuilder().append(id, con.getIdentifier()).isEquals();
+    }
+
+    @Override
+    public int hashCode() {
+        return hashCode;
+    }
+
+    @Override
+    public String toString() {
+        return "Connection[ID=" + id + ",Name=" + name.get() + ",Source=" + getSource() + ",Destination=" + getDestination() + ",Relationships=" + getRelationships();
+    }
+
+    /**
+     * Gives this Connection ownership of the given FlowFile and allows the
+     * Connection to hold on to the FlowFile but NOT provide the FlowFile to
+     * consumers. This allows us to ensure that the Connection is not deleted
+     * during the middle of a Session commit.
+     *
+     * @param flowFile
+     */
+    @Override
+    public void enqueue(final FlowFileRecord flowFile) {
+        flowFileQueue.put(flowFile);
+    }
+
+    @Override
+    public void enqueue(final Collection<FlowFileRecord> flowFiles) {
+        flowFileQueue.putAll(flowFiles);
+    }
+
+    public static class Builder {
+
+        private final ProcessScheduler scheduler;
+
+        private String id = UUID.randomUUID().toString();
+        private String name;
+        private List<Position> bendPoints = new ArrayList<>();
+        private ProcessGroup processGroup;
+        private Connectable source;
+        private Connectable destination;
+        private Collection<Relationship> relationships;
+
+        public Builder(final ProcessScheduler scheduler) {
+            this.scheduler = scheduler;
+        }
+
+        public Builder id(final String id) {
+            this.id = id;
+            return this;
+        }
+
+        public Builder source(final Connectable source) {
+            this.source = source;
+            return this;
+        }
+
+        public Builder processGroup(final ProcessGroup group) {
+            this.processGroup = group;
+            return this;
+        }
+
+        public Builder destination(final Connectable destination) {
+            this.destination = destination;
+            return this;
+        }
+
+        public Builder relationships(final Collection<Relationship> relationships) {
+            this.relationships = new ArrayList<>(relationships);
+            return this;
+        }
+
+        public Builder name(final String name) {
+            this.name = name;
+            return this;
+        }
+
+        public Builder bendPoints(final List<Position> bendPoints) {
+            this.bendPoints.clear();
+            this.bendPoints.addAll(bendPoints);
+            return this;
+        }
+
+        public Builder addBendPoint(final Position bendPoint) {
+            bendPoints.add(bendPoint);
+            return this;
+        }
+
+        public StandardConnection build() {
+            if (source == null) {
+                throw new IllegalStateException("Cannot build a Connection without a Source");
+            }
+            if (destination == null) {
+                throw new IllegalStateException("Cannot build a Connection without a Destination");
+            }
+
+            if (relationships == null) {
+                relationships = new ArrayList<>();
+            }
+
+            if (relationships.isEmpty()) {
+                // ensure relationships have been specified for processors, otherwise the anonymous relationship is used
+                if (source.getConnectableType() == ConnectableType.PROCESSOR) {
+                    throw new IllegalStateException("Cannot build a Connection without any relationships");
+                }
+                relationships.add(Relationship.ANONYMOUS);
+            }
+
+            return new StandardConnection(this);
+        }
+    }
+
+    @Override
+    public void verifyCanUpdate() {
+        // StandardConnection can always be updated
+    }
+
+    @Override
+    public void verifyCanDelete() {
+        if (!flowFileQueue.isEmpty()) {
+            throw new IllegalStateException("Queue not empty for " + this);
+        }
+
+        if (source.isRunning()) {
+            if (!ConnectableType.FUNNEL.equals(source.getConnectableType())) {
+                throw new IllegalStateException("Source of Connection (" + source + ") is running");
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/EventDrivenWorkerQueue.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/EventDrivenWorkerQueue.java b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/EventDrivenWorkerQueue.java
new file mode 100644
index 0000000..f36a459
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/EventDrivenWorkerQueue.java
@@ -0,0 +1,329 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.nifi.connectable.Connectable;
+import org.apache.nifi.connectable.Connection;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.util.Connectables;
+
+public class EventDrivenWorkerQueue implements WorkerQueue {
+
+    private final Object workMonitor = new Object();
+
+    private final Map<Connectable, Worker> workerMap = new HashMap<>();   // protected by synchronizing on workMonitor
+    private final WorkerReadyQueue workerQueue;
+
+    public EventDrivenWorkerQueue(final boolean clustered, final boolean primary, final ProcessScheduler scheduler) {
+        workerQueue = new WorkerReadyQueue(scheduler);
+        workerQueue.setClustered(clustered);
+        workerQueue.setPrimary(primary);
+    }
+
+    @Override
+    public void setClustered(final boolean clustered) {
+        workerQueue.setClustered(clustered);
+    }
+
+    @Override
+    public void setPrimary(final boolean primary) {
+        workerQueue.setPrimary(primary);
+    }
+
+    @Override
+    public Worker poll(final long timeout, final TimeUnit timeUnit) {
+        final long maxTime = System.currentTimeMillis() + TimeUnit.MILLISECONDS.convert(timeout, timeUnit);
+        while (System.currentTimeMillis() < maxTime) {
+            synchronized (workMonitor) {
+                final Worker worker = workerQueue.poll();
+                if (worker == null) {
+                    // nothing to do. wait until we have something to do.
+                    final long timeLeft = maxTime - System.currentTimeMillis();
+                    if (timeLeft <= 0) {
+                        return null;
+                    }
+
+                    try {
+                        workMonitor.wait(timeLeft);
+                    } catch (final InterruptedException ignored) {
+                    }
+                } else {
+                    // Decrement the amount of work there is to do for this worker.
+                    final int workLeft = worker.decrementEventCount();
+                    if (workLeft > 0) {
+                        workerQueue.offer(worker);
+                    }
+
+                    return worker;
+                }
+            }
+        }
+
+        return null;
+    }
+
+    @Override
+    public void offer(final Connectable connectable) {
+        synchronized (workMonitor) {
+            Worker worker = workerMap.get(connectable);
+            if (worker == null) {
+                // if worker is null, then it has not been scheduled to run; ignore the event.
+                return;
+            }
+
+            final int countBefore = worker.incrementEventCount();
+            if (countBefore < 0) {
+                worker.setWorkCount(1);
+            }
+            if (countBefore <= 0) {
+                // If countBefore > 0 then it's already on the queue, so just incrementing its counter is sufficient.
+                workerQueue.offer(worker);
+            }
+
+            workMonitor.notify();
+        }
+    }
+
+    private int getWorkCount(final Connectable connectable) {
+        int sum = 0;
+        for (final Connection connection : connectable.getIncomingConnections()) {
+            sum += connection.getFlowFileQueue().size().getObjectCount();
+        }
+        return sum;
+    }
+
+    @Override
+    public void resumeWork(final Connectable connectable) {
+        synchronized (workMonitor) {
+            final int workCount = getWorkCount(connectable);
+            final Worker worker = new Worker(connectable);
+            workerMap.put(connectable, worker);
+
+            if (workCount > 0) {
+                worker.setWorkCount(workCount);
+                workerQueue.offer(worker);
+                workMonitor.notify();
+            }
+        }
+    }
+
+    @Override
+    public void suspendWork(final Connectable connectable) {
+        synchronized (workMonitor) {
+            final Worker worker = this.workerMap.remove(connectable);
+            if (worker == null) {
+                return;
+            }
+
+            worker.resetWorkCount();
+            workerQueue.remove(worker);
+        }
+    }
+
+    public static class Worker implements EventBasedWorker {
+
+        private final Connectable connectable;
+        private final AtomicInteger workCount = new AtomicInteger(0);
+
+        public Worker(final Connectable connectable) {
+            this.connectable = connectable;
+        }
+
+        @Override
+        public Connectable getConnectable() {
+            return connectable;
+        }
+
+        @Override
+        public int decrementEventCount() {
+            return workCount.decrementAndGet();
+        }
+
+        @Override
+        public int incrementEventCount() {
+            return workCount.getAndIncrement();
+        }
+
+        void resetWorkCount() {
+            workCount.set(0);
+        }
+
+        void setWorkCount(final int workCount) {
+            this.workCount.set(workCount);
+        }
+    }
+
+    @SuppressWarnings("serial")
+    private static class WorkerReadyQueue extends LinkedList<Worker> {
+
+        private final ProcessScheduler scheduler;
+
+        private volatile boolean clustered = false;
+        private volatile boolean primary = false;
+
+        public WorkerReadyQueue(final ProcessScheduler scheduler) {
+            this.scheduler = scheduler;
+        }
+
+        public void setClustered(final boolean clustered) {
+            this.clustered = clustered;
+        }
+
+        public void setPrimary(final boolean primary) {
+            this.primary = primary;
+        }
+
+        @Override
+        public Worker poll() {
+            final List<Worker> putBack = new ArrayList<>();
+
+            Worker worker;
+            try {
+                while ((worker = super.poll()) != null) {
+                    final DelayProcessingReason reason = getDelayReason(worker);
+                    if (reason == null) {
+                        return worker;
+                    } else {
+                        // Worker is not ready. We may want to add him back to the queue, depending on the reason that he is unready.
+                        switch (reason) {
+                            case YIELDED:
+                            case ISOLATED:
+                            case DESTINATION_FULL:
+                            case ALL_WORK_PENALIZED:
+                            case NO_WORK:
+                            case TOO_MANY_THREADS:
+                                // there will not be an event that triggers this to happen, so we add this worker back to the queue.
+                                putBack.add(worker);
+                                break;
+                            default:
+                            case NOT_RUNNING:
+                                // There's no need to check if this worker is available again until a another event
+                                // occurs. Therefore, we keep him off of the queue and reset his work count
+                                worker.resetWorkCount();
+                                break;
+                        }
+                    }
+                }
+            } finally {
+                if (!putBack.isEmpty()) {
+                    super.addAll(putBack);
+                }
+            }
+
+            return null;
+        }
+
+        private DelayProcessingReason getDelayReason(final Worker worker) {
+            final Connectable connectable = worker.getConnectable();
+
+            if (ScheduledState.RUNNING != connectable.getScheduledState()) {
+                return DelayProcessingReason.NOT_RUNNING;
+            }
+
+            if (connectable.getYieldExpiration() > System.currentTimeMillis()) {
+                return DelayProcessingReason.YIELDED;
+            }
+
+            // For Remote Output Ports,
+            int availableRelationshipCount = 0;
+            if (!connectable.getRelationships().isEmpty()) {
+                availableRelationshipCount = getAvailableRelationshipCount(connectable);
+
+                if (availableRelationshipCount == 0) {
+                    return DelayProcessingReason.DESTINATION_FULL;
+                }
+            }
+
+            if (connectable.hasIncomingConnection() && !Connectables.flowFilesQueued(connectable)) {
+                return DelayProcessingReason.NO_WORK;
+            }
+
+            final int activeThreadCount = scheduler.getActiveThreadCount(worker.getConnectable());
+            final int maxThreadCount = worker.getConnectable().getMaxConcurrentTasks();
+            if (maxThreadCount > 0 && activeThreadCount >= maxThreadCount) {
+                return DelayProcessingReason.TOO_MANY_THREADS;
+            }
+
+            if (connectable instanceof ProcessorNode) {
+                final ProcessorNode procNode = (ProcessorNode) connectable;
+                if (procNode.isIsolated() && clustered && !primary) {
+                    return DelayProcessingReason.ISOLATED;
+                }
+
+                final boolean triggerWhenAnyAvailable = procNode.isTriggerWhenAnyDestinationAvailable();
+                final boolean allDestinationsAvailable = availableRelationshipCount == procNode.getRelationships().size();
+                if (!triggerWhenAnyAvailable && !allDestinationsAvailable) {
+                    return DelayProcessingReason.DESTINATION_FULL;
+                }
+            }
+
+            return null;
+        }
+
+        private int getAvailableRelationshipCount(final Connectable connectable) {
+            int count = 0;
+            for (final Relationship relationship : connectable.getRelationships()) {
+                final Collection<Connection> connections = connectable.getConnections(relationship);
+
+                if (connections == null || connections.isEmpty()) {
+                    if (connectable.isAutoTerminated(relationship)) {
+                        // If the relationship is auto-terminated, consider it available.
+                        count++;
+                    }
+                } else {
+                    boolean available = true;
+                    for (final Connection connection : connections) {
+                        if (connection.getSource() == connection.getDestination()) {
+                            // don't count self-loops
+                            continue;
+                        }
+
+                        if (connection.getFlowFileQueue().isFull()) {
+                            available = false;
+                        }
+                    }
+
+                    if (available) {
+                        count++;
+                    }
+                }
+            }
+
+            return count;
+        }
+    }
+
+    private static enum DelayProcessingReason {
+
+        YIELDED,
+        DESTINATION_FULL,
+        NO_WORK,
+        ALL_WORK_PENALIZED,
+        ISOLATED,
+        NOT_RUNNING,
+        TOO_MANY_THREADS;
+    }
+}