You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2015/01/31 04:43:46 UTC
[09/62] [abbrv] [partial] incubator-nifi git commit: NIFI-270 Made
all changes identified by adam, mark, joey to prep for a cleaner build
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/RootGroupPort.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/RootGroupPort.java b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/RootGroupPort.java
deleted file mode 100644
index 4afdfb7..0000000
--- a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/RootGroupPort.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.remote;
-
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.nifi.connectable.Port;
-import org.apache.nifi.remote.exception.BadRequestException;
-import org.apache.nifi.remote.exception.NotAuthorizedException;
-import org.apache.nifi.remote.exception.RequestExpiredException;
-import org.apache.nifi.remote.protocol.ServerProtocol;
-
-public interface RootGroupPort extends Port {
-
- boolean isTransmitting();
-
- void setGroupAccessControl(Set<String> groups);
-
- Set<String> getGroupAccessControl();
-
- void setUserAccessControl(Set<String> users);
-
- Set<String> getUserAccessControl();
-
- /**
- * Verifies that the specified user is authorized to interact with this port
- * and returns a {@link PortAuthorizationResult} indicating why the user is
- * unauthorized if this assumption fails
- *
- * @param dn
- * @return
- */
- PortAuthorizationResult checkUserAuthorization(String dn);
-
- /**
- * Receives data from the given stream
- *
- * @param peer
- * @param serverProtocol
- * @param requestHeaders
- *
- * @return the number of FlowFiles received
- * @throws org.apache.nifi.remote.exception.NotAuthorizedException
- * @throws org.apache.nifi.remote.exception.BadRequestException
- * @throws org.apache.nifi.remote.exception.RequestExpiredException
- */
- int receiveFlowFiles(Peer peer, ServerProtocol serverProtocol, Map<String, String> requestHeaders) throws NotAuthorizedException, BadRequestException, RequestExpiredException;
-
- /**
- * Transfers data to the given stream
- *
- * @param peer
- * @param requestHeaders
- * @param serverProtocol
- *
- * @return the number of FlowFiles transferred
- * @throws org.apache.nifi.remote.exception.NotAuthorizedException
- * @throws org.apache.nifi.remote.exception.BadRequestException
- * @throws org.apache.nifi.remote.exception.RequestExpiredException
- */
- int transferFlowFiles(Peer peer, ServerProtocol serverProtocol, Map<String, String> requestHeaders) throws NotAuthorizedException, BadRequestException, RequestExpiredException;
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/TransferDirection.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/TransferDirection.java b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/TransferDirection.java
deleted file mode 100644
index 56432d5..0000000
--- a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/TransferDirection.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.remote;
-
-public enum TransferDirection {
-
- SEND,
- RECEIVE;
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/VersionedRemoteResource.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/VersionedRemoteResource.java b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/VersionedRemoteResource.java
deleted file mode 100644
index bfccd98..0000000
--- a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/VersionedRemoteResource.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.remote;
-
-public interface VersionedRemoteResource {
-
- VersionNegotiator getVersionNegotiator();
-
- String getResourceName();
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/codec/FlowFileCodec.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/codec/FlowFileCodec.java b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/codec/FlowFileCodec.java
deleted file mode 100644
index b4206b3..0000000
--- a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/codec/FlowFileCodec.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.remote.codec;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.List;
-
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.remote.VersionedRemoteResource;
-import org.apache.nifi.remote.exception.ProtocolException;
-import org.apache.nifi.remote.exception.TransmissionDisabledException;
-
-/**
- * <p>
- * Provides a mechanism for encoding and decoding FlowFiles as streams so that
- * they can be transferred remotely.
- * </p>
- */
-public interface FlowFileCodec extends VersionedRemoteResource {
-
- /**
- * Returns a List of all versions that this codec is able to support, in the
- * order that they are preferred by the codec
- *
- * @return
- */
- public List<Integer> getSupportedVersions();
-
- /**
- * Encodes a FlowFile and its content as a single stream of data and writes
- * that stream to the output. If checksum is not null, it will be calculated
- * as the stream is read
- *
- * @param flowFile the FlowFile to encode
- * @param session a session that can be used to transactionally create and
- * transfer flow files
- * @param outStream the stream to write the data to
- *
- * @return the updated FlowFile
- *
- * @throws IOException
- */
- FlowFile encode(FlowFile flowFile, ProcessSession session, OutputStream outStream) throws IOException, TransmissionDisabledException;
-
- /**
- * Decodes the contents of the InputStream, interpreting the data to
- * determine the next FlowFile's attributes and content, as well as their
- * destinations. If not null, checksum will be used to calculate the
- * checksum as the data is read.
- *
- * @param stream an InputStream containing FlowFiles' contents, attributes,
- * and destinations
- * @param session
- *
- * @return the FlowFile that was created, or <code>null</code> if the stream
- * was out of data
- *
- * @throws IOException
- * @throws ProtocolException if the input is malformed
- */
- FlowFile decode(InputStream stream, ProcessSession session) throws IOException, ProtocolException, TransmissionDisabledException;
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/BadRequestException.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/BadRequestException.java b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/BadRequestException.java
deleted file mode 100644
index f6c2f4f..0000000
--- a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/BadRequestException.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.remote.exception;
-
-public class BadRequestException extends Exception {
-
- private static final long serialVersionUID = -8034602852256106560L;
-
- public BadRequestException(final String message) {
- super(message);
- }
-
- public BadRequestException(final Throwable cause) {
- super(cause);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/HandshakeException.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/HandshakeException.java b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/HandshakeException.java
deleted file mode 100644
index b61fc65..0000000
--- a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/HandshakeException.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.remote.exception;
-
-public class HandshakeException extends Exception {
-
- private static final long serialVersionUID = 178192341908726L;
-
- public HandshakeException(final String message) {
- super(message);
- }
-
- public HandshakeException(final Throwable cause) {
- super(cause);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/NotAuthorizedException.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/NotAuthorizedException.java b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/NotAuthorizedException.java
deleted file mode 100644
index 24ff3a5..0000000
--- a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/NotAuthorizedException.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.remote.exception;
-
-public class NotAuthorizedException extends Exception {
-
- private static final long serialVersionUID = 2952623568114035498L;
-
- public NotAuthorizedException(final String message) {
- super(message);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/PortNotRunningException.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/PortNotRunningException.java b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/PortNotRunningException.java
deleted file mode 100644
index af0f467..0000000
--- a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/PortNotRunningException.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.remote.exception;
-
-public class PortNotRunningException extends Exception {
-
- private static final long serialVersionUID = -2790940982005516375L;
-
- public PortNotRunningException(final String message) {
- super(message);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/ProtocolException.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/ProtocolException.java b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/ProtocolException.java
deleted file mode 100644
index 0f50b98..0000000
--- a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/ProtocolException.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.remote.exception;
-
-public class ProtocolException extends Exception {
-
- private static final long serialVersionUID = 5763900324505818495L;
-
- public ProtocolException(final String message, final Throwable cause) {
- super(message, cause);
- }
-
- public ProtocolException(final String message) {
- super(message);
- }
-
- public ProtocolException(final Throwable cause) {
- super(cause);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/RequestExpiredException.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/RequestExpiredException.java b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/RequestExpiredException.java
deleted file mode 100644
index dd675b3..0000000
--- a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/RequestExpiredException.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.remote.exception;
-
-/**
- * Used to indicate that by the time the request was serviced, it had already
- * expired
- */
-public class RequestExpiredException extends Exception {
-
- private static final long serialVersionUID = -7037025330562827852L;
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/UnknownPortException.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/UnknownPortException.java b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/UnknownPortException.java
deleted file mode 100644
index e6a0fe7..0000000
--- a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/UnknownPortException.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.remote.exception;
-
-public class UnknownPortException extends Exception {
-
- private static final long serialVersionUID = -2790940982005516375L;
-
- public UnknownPortException(final String message) {
- super(message);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/protocol/ClientProtocol.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/protocol/ClientProtocol.java b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/protocol/ClientProtocol.java
deleted file mode 100644
index 32274eb..0000000
--- a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/protocol/ClientProtocol.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.remote.protocol;
-
-import java.io.IOException;
-import java.util.Set;
-
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.remote.Peer;
-import org.apache.nifi.remote.PeerStatus;
-import org.apache.nifi.remote.VersionedRemoteResource;
-import org.apache.nifi.remote.codec.FlowFileCodec;
-import org.apache.nifi.remote.exception.HandshakeException;
-import org.apache.nifi.remote.exception.PortNotRunningException;
-import org.apache.nifi.remote.exception.ProtocolException;
-import org.apache.nifi.remote.exception.UnknownPortException;
-
-public interface ClientProtocol extends VersionedRemoteResource {
-
- void handshake(Peer peer) throws IOException, HandshakeException, UnknownPortException, PortNotRunningException;
-
- Set<PeerStatus> getPeerStatuses(Peer currentPeer) throws IOException, ProtocolException;
-
- FlowFileCodec negotiateCodec(Peer peer) throws IOException, ProtocolException;
-
- void receiveFlowFiles(Peer peer, ProcessContext context, ProcessSession session, FlowFileCodec codec) throws IOException, ProtocolException;
-
- void transferFlowFiles(Peer peer, ProcessContext context, ProcessSession session, FlowFileCodec codec) throws IOException, ProtocolException;
-
- void shutdown(Peer peer) throws IOException, ProtocolException;
-
- boolean isReadyForFileTransfer();
-
- /**
- * returns <code>true</code> if remote instance indicates that the port is
- * invalid
- *
- * @return
- * @throws IllegalStateException if a handshake has not successfully
- * completed
- */
- boolean isPortInvalid() throws IllegalStateException;
-
- /**
- * returns <code>true</code> if remote instance indicates that the port is
- * unknown
- *
- * @return
- * @throws IllegalStateException if a handshake has not successfully
- * completed
- */
- boolean isPortUnknown();
-
- /**
- * returns <code>true</code> if remote instance indicates that the port's
- * destination is full
- *
- * @return
- * @throws IllegalStateException if a handshake has not successfully
- * completed
- */
- boolean isDestinationFull();
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/protocol/CommunicationsInput.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/protocol/CommunicationsInput.java b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/protocol/CommunicationsInput.java
deleted file mode 100644
index d2e2946..0000000
--- a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/protocol/CommunicationsInput.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.remote.protocol;
-
-import java.io.IOException;
-import java.io.InputStream;
-
-public interface CommunicationsInput {
-
- InputStream getInputStream() throws IOException;
-
- long getBytesRead();
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/protocol/CommunicationsOutput.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/protocol/CommunicationsOutput.java b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/protocol/CommunicationsOutput.java
deleted file mode 100644
index 95cab29..0000000
--- a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/protocol/CommunicationsOutput.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.remote.protocol;
-
-import java.io.IOException;
-import java.io.OutputStream;
-
-public interface CommunicationsOutput {
-
- OutputStream getOutputStream() throws IOException;
-
- long getBytesWritten();
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/protocol/CommunicationsSession.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/protocol/CommunicationsSession.java b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/protocol/CommunicationsSession.java
deleted file mode 100644
index d009cec..0000000
--- a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/protocol/CommunicationsSession.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.remote.protocol;
-
-import java.io.Closeable;
-import java.io.IOException;
-
-public interface CommunicationsSession extends Closeable {
-
- public static final byte[] MAGIC_BYTES = {(byte) 'N', (byte) 'i', (byte) 'F', (byte) 'i'};
-
- CommunicationsInput getInput();
-
- CommunicationsOutput getOutput();
-
- void setTimeout(int millis) throws IOException;
-
- int getTimeout() throws IOException;
-
- void setUri(String uri);
-
- String getUri();
-
- String getUserDn();
-
- void setUserDn(String dn);
-
- boolean isDataAvailable();
-
- long getBytesWritten();
-
- long getBytesRead();
-
- /**
- * Asynchronously interrupts this FlowFileCodec. Implementations must ensure
- * that they stop sending and receiving data as soon as possible after this
- * method has been called, even if doing so results in sending only partial
- * data to the peer. This will usually result in the peer throwing a
- * SocketTimeoutException.
- */
- void interrupt();
-
- /**
- * Returns <code>true</code> if the connection is closed, <code>false</code>
- * otherwise.
- *
- * @return
- */
- boolean isClosed();
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/protocol/RequestType.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/protocol/RequestType.java b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/protocol/RequestType.java
deleted file mode 100644
index 41334fe..0000000
--- a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/protocol/RequestType.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.remote.protocol;
-
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-
-public enum RequestType {
-
- NEGOTIATE_FLOWFILE_CODEC,
- REQUEST_PEER_LIST,
- SEND_FLOWFILES,
- RECEIVE_FLOWFILES,
- SHUTDOWN;
-
- public void writeRequestType(final DataOutputStream dos) throws IOException {
- dos.writeUTF(name());
- }
-
- public static RequestType readRequestType(final DataInputStream dis) throws IOException {
- final String requestTypeVal = dis.readUTF();
- try {
- return RequestType.valueOf(requestTypeVal);
- } catch (final Exception e) {
- throw new IOException("Could not determine RequestType: received invalid value " + requestTypeVal);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/protocol/ServerProtocol.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/protocol/ServerProtocol.java b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/protocol/ServerProtocol.java
deleted file mode 100644
index 0d18f2e..0000000
--- a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/protocol/ServerProtocol.java
+++ /dev/null
@@ -1,143 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.remote.protocol;
-
-import java.io.IOException;
-
-import org.apache.nifi.cluster.NodeInformant;
-import org.apache.nifi.groups.ProcessGroup;
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.remote.Peer;
-import org.apache.nifi.remote.RootGroupPort;
-import org.apache.nifi.remote.VersionedRemoteResource;
-import org.apache.nifi.remote.codec.FlowFileCodec;
-import org.apache.nifi.remote.exception.BadRequestException;
-import org.apache.nifi.remote.exception.HandshakeException;
-import org.apache.nifi.remote.exception.ProtocolException;
-
-public interface ServerProtocol extends VersionedRemoteResource {
-
- /**
- *
- * @param rootGroup
- */
- void setRootProcessGroup(ProcessGroup rootGroup);
-
- RootGroupPort getPort();
-
- /**
- * Optional operation. Sets the NodeInformant to use in this Protocol, if a
- * NodeInformant is supported. Otherwise, throws
- * UnsupportedOperationException
- *
- * @param nodeInformant
- */
- void setNodeInformant(NodeInformant nodeInformant);
-
- /**
- * Receives the handshake from the Peer
- *
- * @param peer
- * @throws IOException
- * @throws HandshakeException
- */
- void handshake(Peer peer) throws IOException, HandshakeException;
-
- /**
- * Returns <code>true</code> if the handshaking process was completed
- * successfully, <code>false</code> if either the handshaking process has
- * not happened or the handshake failed
- *
- * @return
- */
- boolean isHandshakeSuccessful();
-
- /**
- * Negotiates the FlowFileCodec that is to be used for transferring
- * FlowFiles
- *
- * @param peer
- * @return
- * @throws IOException
- * @throws BadRequestException
- */
- FlowFileCodec negotiateCodec(Peer peer) throws IOException, ProtocolException;
-
- /**
- * Returns the codec that has already been negotiated by this Protocol, if
- * any.
- *
- * @return
- */
- FlowFileCodec getPreNegotiatedCodec();
-
- /**
- * Reads the Request Type of the next request from the Peer
- *
- * @return the RequestType that the peer would like to happen - or null, if
- * no data available
- */
- RequestType getRequestType(Peer peer) throws IOException;
-
- /**
- * Sends FlowFiles to the specified peer
- *
- * @param peer
- * @param context
- * @param session
- * @param codec
- *
- * @return the number of FlowFiles transferred
- */
- int transferFlowFiles(Peer peer, ProcessContext context, ProcessSession session, FlowFileCodec codec) throws IOException, ProtocolException;
-
- /**
- * Receives FlowFiles from the specified peer
- *
- * @param peer
- * @param context
- * @param session
- * @param codec
- * @throws IOException
- *
- * @return the number of FlowFiles received
- * @throws ProtocolException
- */
- int receiveFlowFiles(Peer peer, ProcessContext context, ProcessSession session, FlowFileCodec codec) throws IOException, ProtocolException;
-
- /**
- * Returns the number of milliseconds after a request is received for which
- * the request is still valid. A valid of 0 indicates that the request will
- * not expire.
- *
- * @return
- */
- long getRequestExpiration();
-
- /**
- * Sends a list of all nodes in the cluster to the specified peer. If not in
- * a cluster, sends info about itself
- *
- * @param peer
- */
- void sendPeerList(Peer peer) throws IOException;
-
- void shutdown(Peer peer);
-
- boolean isShutdown();
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/core/.gitignore
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/core/.gitignore b/nifi/nar-bundles/framework-bundle/framework/core/.gitignore
deleted file mode 100755
index ea8c4bf..0000000
--- a/nifi/nar-bundles/framework-bundle/framework/core/.gitignore
+++ /dev/null
@@ -1 +0,0 @@
-/target
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/core/pom.xml
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/core/pom.xml b/nifi/nar-bundles/framework-bundle/framework/core/pom.xml
deleted file mode 100644
index e8214bf..0000000
--- a/nifi/nar-bundles/framework-bundle/framework/core/pom.xml
+++ /dev/null
@@ -1,121 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <parent>
- <groupId>org.apache.nifi</groupId>
- <artifactId>nifi-framework-parent</artifactId>
- <version>0.0.1-incubating-SNAPSHOT</version>
- </parent>
- <artifactId>framework-core</artifactId>
- <packaging>jar</packaging>
- <name>NiFi Framework Core</name>
- <dependencies>
- <dependency>
- <groupId>org.apache.nifi</groupId>
- <artifactId>core-api</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.nifi</groupId>
- <artifactId>nifi-api</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.nifi</groupId>
- <artifactId>nifi-expression-language</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.nifi</groupId>
- <artifactId>nifi-utils</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.nifi</groupId>
- <artifactId>nifi-properties</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.nifi</groupId>
- <artifactId>site-to-site</artifactId>
- </dependency>
- <dependency>
- <groupId>ch.qos.logback</groupId>
- <artifactId>logback-classic</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.nifi</groupId>
- <artifactId>framework-cluster-protocol</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.nifi</groupId>
- <artifactId>nifi-logging-utils</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.nifi</groupId>
- <artifactId>client-dto</artifactId>
- </dependency>
- <dependency>
- <groupId>org.quartz-scheduler</groupId>
- <artifactId>quartz</artifactId>
- </dependency>
- <dependency>
- <groupId>com.h2database</groupId>
- <artifactId>h2</artifactId>
- </dependency>
- <dependency>
- <groupId>org.jasypt</groupId>
- <artifactId>jasypt</artifactId>
- </dependency>
- <dependency>
- <groupId>org.bouncycastle</groupId>
- <artifactId>bcprov-jdk16</artifactId>
- </dependency>
- <dependency>
- <groupId>com.sun.jersey</groupId>
- <artifactId>jersey-client</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.nifi</groupId>
- <artifactId>nifi-security</artifactId>
- </dependency>
- <dependency>
- <groupId>org.springframework</groupId>
- <artifactId>spring-core</artifactId>
- </dependency>
- <dependency>
- <groupId>org.springframework</groupId>
- <artifactId>spring-beans</artifactId>
- </dependency>
- <dependency>
- <groupId>org.springframework</groupId>
- <artifactId>spring-context</artifactId>
- </dependency>
- <dependency>
- <groupId>commons-io</groupId>
- <artifactId>commons-io</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.nifi</groupId>
- <artifactId>data-provenance-utils</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.nifi</groupId>
- <artifactId>wali</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.nifi</groupId>
- <artifactId>nifi-mock</artifactId>
- <scope>test</scope>
- </dependency>
- </dependencies>
-</project>
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/cluster/BulletinsPayload.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/cluster/BulletinsPayload.java b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/cluster/BulletinsPayload.java
deleted file mode 100644
index 1249657..0000000
--- a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/cluster/BulletinsPayload.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.cluster;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.Set;
-import javax.xml.bind.JAXBContext;
-import javax.xml.bind.JAXBException;
-import javax.xml.bind.Marshaller;
-import javax.xml.bind.Unmarshaller;
-import javax.xml.bind.annotation.XmlRootElement;
-import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter;
-import org.apache.nifi.cluster.protocol.ProtocolException;
-import org.apache.nifi.jaxb.BulletinAdapter;
-import org.apache.nifi.reporting.Bulletin;
-
-/**
- * The payload of the bulletins.
- *
- * @author unattributed
- */
-@XmlRootElement
-public class BulletinsPayload {
-
- private static final JAXBContext JAXB_CONTEXT;
-
- static {
- try {
- JAXB_CONTEXT = JAXBContext.newInstance(BulletinsPayload.class);
- } catch (JAXBException e) {
- throw new RuntimeException("Unable to create JAXBContext.");
- }
- }
-
- private Set<Bulletin> bulletins;
-
- @XmlJavaTypeAdapter(BulletinAdapter.class)
- public Set<Bulletin> getBulletins() {
- return bulletins;
- }
-
- public void setBulletins(final Set<Bulletin> bulletins) {
- this.bulletins = bulletins;
- }
-
- public byte[] marshal() throws ProtocolException {
- final ByteArrayOutputStream payloadBytes = new ByteArrayOutputStream();
- marshal(this, payloadBytes);
- return payloadBytes.toByteArray();
- }
-
- public static void marshal(final BulletinsPayload payload, final OutputStream os) throws ProtocolException {
- try {
- final Marshaller marshaller = JAXB_CONTEXT.createMarshaller();
- marshaller.marshal(payload, os);
- } catch (final JAXBException je) {
- throw new ProtocolException(je);
- }
- }
-
- public static BulletinsPayload unmarshal(final InputStream is) throws ProtocolException {
- try {
- final Unmarshaller unmarshaller = JAXB_CONTEXT.createUnmarshaller();
- return (BulletinsPayload) unmarshaller.unmarshal(is);
- } catch (final JAXBException je) {
- throw new ProtocolException(je);
- }
- }
-
- public static BulletinsPayload unmarshal(final byte[] bytes) throws ProtocolException {
- try {
- final Unmarshaller unmarshaller = JAXB_CONTEXT.createUnmarshaller();
- return (BulletinsPayload) unmarshaller.unmarshal(new ByteArrayInputStream(bytes));
- } catch (final JAXBException je) {
- throw new ProtocolException(je);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/cluster/ConnectionException.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/cluster/ConnectionException.java b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/cluster/ConnectionException.java
deleted file mode 100644
index 986e904..0000000
--- a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/cluster/ConnectionException.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.cluster;
-
-/**
- * Represents the exceptional case when connection to the cluster fails.
- *
- * @author unattributed
- */
-public class ConnectionException extends RuntimeException {
-
- private static final long serialVersionUID = -1378294897231234028L;
-
- public ConnectionException() {
- }
-
- public ConnectionException(String msg) {
- super(msg);
- }
-
- public ConnectionException(Throwable cause) {
- super(cause);
- }
-
- public ConnectionException(String msg, Throwable cause) {
- super(msg, cause);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/cluster/DisconnectionException.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/cluster/DisconnectionException.java b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/cluster/DisconnectionException.java
deleted file mode 100644
index 55707f3..0000000
--- a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/cluster/DisconnectionException.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.cluster;
-
-/**
- * Represents the exceptional case when disconnection from the cluster fails.
- *
- * @author unattributed
- */
-public class DisconnectionException extends RuntimeException {
-
- private static final long serialVersionUID = 6648876367997026125L;
-
- public DisconnectionException() {
- }
-
- public DisconnectionException(String msg) {
- super(msg);
- }
-
- public DisconnectionException(Throwable cause) {
- super(cause);
- }
-
- public DisconnectionException(String msg, Throwable cause) {
- super(msg, cause);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/cluster/HeartbeatPayload.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/cluster/HeartbeatPayload.java b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/cluster/HeartbeatPayload.java
deleted file mode 100644
index 093b238..0000000
--- a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/cluster/HeartbeatPayload.java
+++ /dev/null
@@ -1,170 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.cluster;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.List;
-import javax.xml.bind.JAXBContext;
-import javax.xml.bind.JAXBException;
-import javax.xml.bind.Marshaller;
-import javax.xml.bind.Unmarshaller;
-import javax.xml.bind.annotation.XmlRootElement;
-import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter;
-import org.apache.nifi.cluster.protocol.ProtocolException;
-import org.apache.nifi.controller.Counter;
-import org.apache.nifi.controller.status.ProcessGroupStatus;
-import org.apache.nifi.diagnostics.SystemDiagnostics;
-import org.apache.nifi.jaxb.CounterAdapter;
-
-/**
- * The payload of the heartbeat. The payload contains status to inform the
- * cluster manager the current workload of this node.
- *
- * @author unattributed
- */
-@XmlRootElement
-public class HeartbeatPayload {
-
- private static final JAXBContext JAXB_CONTEXT;
-
- static {
- try {
- JAXB_CONTEXT = JAXBContext.newInstance(HeartbeatPayload.class);
- } catch (JAXBException e) {
- throw new RuntimeException("Unable to create JAXBContext.");
- }
- }
-
- private List<Counter> counters;
- private ProcessGroupStatus processGroupStatus;
- private int activeThreadCount;
- private long totalFlowFileCount;
- private long totalFlowFileBytes;
- private SystemDiagnostics systemDiagnostics;
- private Integer siteToSitePort;
- private boolean siteToSiteSecure;
- private long systemStartTime;
-
- @XmlJavaTypeAdapter(CounterAdapter.class)
- public List<Counter> getCounters() {
- return counters;
- }
-
- public void setCounters(final List<Counter> counters) {
- this.counters = counters;
- }
-
- public int getActiveThreadCount() {
- return activeThreadCount;
- }
-
- public void setActiveThreadCount(final int activeThreadCount) {
- this.activeThreadCount = activeThreadCount;
- }
-
- public long getTotalFlowFileCount() {
- return totalFlowFileCount;
- }
-
- public void setTotalFlowFileCount(final long totalFlowFileCount) {
- this.totalFlowFileCount = totalFlowFileCount;
- }
-
- public long getTotalFlowFileBytes() {
- return totalFlowFileBytes;
- }
-
- public void setTotalFlowFileBytes(final long totalFlowFileBytes) {
- this.totalFlowFileBytes = totalFlowFileBytes;
- }
-
- public ProcessGroupStatus getProcessGroupStatus() {
- return processGroupStatus;
- }
-
- public void setProcessGroupStatus(final ProcessGroupStatus processGroupStatus) {
- this.processGroupStatus = processGroupStatus;
- }
-
- public SystemDiagnostics getSystemDiagnostics() {
- return systemDiagnostics;
- }
-
- public void setSystemDiagnostics(final SystemDiagnostics systemDiagnostics) {
- this.systemDiagnostics = systemDiagnostics;
- }
-
- public boolean isSiteToSiteSecure() {
- return siteToSiteSecure;
- }
-
- public void setSiteToSiteSecure(final boolean secure) {
- this.siteToSiteSecure = secure;
- }
-
- public Integer getSiteToSitePort() {
- return siteToSitePort;
- }
-
- public void setSiteToSitePort(final Integer port) {
- this.siteToSitePort = port;
- }
-
- public long getSystemStartTime() {
- return systemStartTime;
- }
-
- public void setSystemStartTime(final long systemStartTime) {
- this.systemStartTime = systemStartTime;
- }
-
- public byte[] marshal() throws ProtocolException {
- final ByteArrayOutputStream payloadBytes = new ByteArrayOutputStream();
- marshal(this, payloadBytes);
- return payloadBytes.toByteArray();
- }
-
- public static void marshal(final HeartbeatPayload payload, final OutputStream os) throws ProtocolException {
- try {
- final Marshaller marshaller = JAXB_CONTEXT.createMarshaller();
- marshaller.marshal(payload, os);
- } catch (final JAXBException je) {
- throw new ProtocolException(je);
- }
- }
-
- public static HeartbeatPayload unmarshal(final InputStream is) throws ProtocolException {
- try {
- final Unmarshaller unmarshaller = JAXB_CONTEXT.createUnmarshaller();
- return (HeartbeatPayload) unmarshaller.unmarshal(is);
- } catch (final JAXBException je) {
- throw new ProtocolException(je);
- }
- }
-
- public static HeartbeatPayload unmarshal(final byte[] bytes) throws ProtocolException {
- try {
- final Unmarshaller unmarshaller = JAXB_CONTEXT.createUnmarshaller();
- return (HeartbeatPayload) unmarshaller.unmarshal(new ByteArrayInputStream(bytes));
- } catch (final JAXBException je) {
- throw new ProtocolException(je);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/connectable/LocalPort.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/connectable/LocalPort.java b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/connectable/LocalPort.java
deleted file mode 100644
index f0739c2..0000000
--- a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/connectable/LocalPort.java
+++ /dev/null
@@ -1,172 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.connectable;
-
-import org.apache.nifi.connectable.ConnectableType;
-import org.apache.nifi.connectable.Connection;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import org.apache.nifi.components.ValidationResult;
-import org.apache.nifi.controller.AbstractPort;
-import org.apache.nifi.controller.ProcessScheduler;
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.groups.ProcessGroup;
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.Relationship;
-import org.apache.nifi.scheduling.SchedulingStrategy;
-
-/**
- * Provides a mechanism by which <code>FlowFile</code>s can be transferred into
- * and out of a <code>ProcessGroup</code> to and/or from another
- * <code>ProcessGroup</code> within the same instance of NiFi.
- */
-public class LocalPort extends AbstractPort {
-
- private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
- private final Lock readLock = rwLock.readLock();
- private final Lock writeLock = rwLock.writeLock();
-
- public LocalPort(final String id, final String name, final ProcessGroup processGroup, final ConnectableType type, final ProcessScheduler scheduler) {
- super(id, name, processGroup, type, scheduler);
- }
-
- @Override
- public boolean isValid() {
- return !getConnections(Relationship.ANONYMOUS).isEmpty();
- }
-
- @Override
- public Collection<ValidationResult> getValidationErrors() {
- final Collection<ValidationResult> validationErrors = new ArrayList<>();
- if (!isValid()) {
- final ValidationResult error = new ValidationResult.Builder()
- .explanation(String.format("Output connection for port '%s' is not defined.", getName()))
- .subject(String.format("Port '%s'", getName()))
- .valid(false)
- .build();
- validationErrors.add(error);
- }
- return validationErrors;
- }
-
- @Override
- public void onTrigger(final ProcessContext context, final ProcessSession session) {
- readLock.lock();
- try {
- final List<FlowFile> flowFiles = session.get(10);
- if (flowFiles.isEmpty()) {
- context.yield();
- } else {
- session.transfer(flowFiles, Relationship.ANONYMOUS);
- }
- } finally {
- readLock.unlock();
- }
- }
-
- @Override
- public void updateConnection(final Connection connection) throws IllegalStateException {
- writeLock.lock();
- try {
- super.updateConnection(connection);
- } finally {
- writeLock.unlock();
- }
- }
-
- @Override
- public void addConnection(final Connection connection) throws IllegalArgumentException {
- writeLock.lock();
- try {
- super.addConnection(connection);
- } finally {
- writeLock.unlock();
- }
- }
-
- @Override
- public void removeConnection(final Connection connection) throws IllegalArgumentException, IllegalStateException {
- writeLock.lock();
- try {
- super.removeConnection(connection);
- } finally {
- writeLock.unlock();
- }
- }
-
- @Override
- public Set<Connection> getConnections() {
- readLock.lock();
- try {
- return super.getConnections();
- } finally {
- readLock.unlock();
- }
- }
-
- @Override
- public Set<Connection> getConnections(Relationship relationship) {
- readLock.lock();
- try {
- return super.getConnections(relationship);
- } finally {
- readLock.unlock();
- }
- }
-
- @Override
- public List<Connection> getIncomingConnections() {
- readLock.lock();
- try {
- return super.getIncomingConnections();
- } finally {
- readLock.unlock();
- }
- }
-
- @Override
- public boolean hasIncomingConnection() {
- readLock.lock();
- try {
- return super.hasIncomingConnection();
- } finally {
- readLock.unlock();
- }
- }
-
- @Override
- public boolean isTriggerWhenEmpty() {
- return false;
- }
-
- @Override
- public SchedulingStrategy getSchedulingStrategy() {
- return SchedulingStrategy.TIMER_DRIVEN;
- }
-
- @Override
- public boolean isSideEffectFree() {
- return true;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/connectable/StandardConnection.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/connectable/StandardConnection.java b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/connectable/StandardConnection.java
deleted file mode 100644
index 1d723b5..0000000
--- a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/connectable/StandardConnection.java
+++ /dev/null
@@ -1,336 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.connectable;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
-
-import org.apache.nifi.controller.FlowFileQueue;
-import org.apache.nifi.controller.ProcessScheduler;
-import org.apache.nifi.controller.StandardFlowFileQueue;
-import org.apache.nifi.controller.repository.FlowFileRecord;
-import org.apache.nifi.groups.ProcessGroup;
-import org.apache.nifi.processor.FlowFileFilter;
-import org.apache.nifi.processor.Relationship;
-import org.apache.nifi.util.NiFiProperties;
-
-import org.apache.commons.lang3.builder.EqualsBuilder;
-import org.apache.commons.lang3.builder.HashCodeBuilder;
-
-/**
- * Models a connection between connectable components. A connection may contain
- * one or more relationships that map the source component to the destination
- * component.
- */
-public final class StandardConnection implements Connection {
-
- private final String id;
- private final AtomicReference<ProcessGroup> processGroup;
- private final AtomicReference<String> name;
- private final AtomicReference<List<Position>> bendPoints;
- private final Connectable source;
- private final AtomicReference<Connectable> destination;
- private final AtomicReference<Collection<Relationship>> relationships;
- private final StandardFlowFileQueue flowFileQueue;
- private final AtomicInteger labelIndex = new AtomicInteger(1);
- private final AtomicLong zIndex = new AtomicLong(0L);
- private final ProcessScheduler scheduler;
- private final int hashCode;
-
- private StandardConnection(final Builder builder) {
- id = builder.id;
- name = new AtomicReference<>(builder.name);
- bendPoints = new AtomicReference<>(Collections.unmodifiableList(new ArrayList<>(builder.bendPoints)));
- processGroup = new AtomicReference<>(builder.processGroup);
- source = builder.source;
- destination = new AtomicReference<>(builder.destination);
- relationships = new AtomicReference<>(Collections.unmodifiableCollection(builder.relationships));
- scheduler = builder.scheduler;
- flowFileQueue = new StandardFlowFileQueue(id, this, scheduler, NiFiProperties.getInstance().getQueueSwapThreshold());
- hashCode = new HashCodeBuilder(7, 67).append(id).toHashCode();
- }
-
- public ProcessGroup getProcessGroup() {
- return processGroup.get();
- }
-
- public String getIdentifier() {
- return id;
- }
-
- public String getName() {
- return name.get();
- }
-
- public void setName(final String name) {
- this.name.set(name);
- }
-
- @Override
- public List<Position> getBendPoints() {
- return bendPoints.get();
- }
-
- @Override
- public void setBendPoints(final List<Position> position) {
- this.bendPoints.set(Collections.unmodifiableList(new ArrayList<>(position)));
- }
-
- public int getLabelIndex() {
- return labelIndex.get();
- }
-
- public void setLabelIndex(final int labelIndex) {
- this.labelIndex.set(labelIndex);
- }
-
- @Override
- public long getZIndex() {
- return zIndex.get();
- }
-
- @Override
- public void setZIndex(final long zIndex) {
- this.zIndex.set(zIndex);
- }
-
- public Connectable getSource() {
- return source;
- }
-
- public Connectable getDestination() {
- return destination.get();
- }
-
- public Collection<Relationship> getRelationships() {
- return relationships.get();
- }
-
- public FlowFileQueue getFlowFileQueue() {
- return flowFileQueue;
- }
-
- public void setProcessGroup(final ProcessGroup newGroup) {
- final ProcessGroup currentGroup = this.processGroup.get();
- try {
- this.processGroup.set(newGroup);
- } catch (final RuntimeException e) {
- this.processGroup.set(currentGroup);
- throw e;
- }
- }
-
- public void setRelationships(final Collection<Relationship> newRelationships) {
- final Collection<Relationship> currentRelationships = relationships.get();
- if (currentRelationships.equals(newRelationships)) {
- return;
- }
-
- if (getSource().isRunning()) {
- throw new IllegalStateException("Cannot update the relationships for Connection because the source of the Connection is running");
- }
-
- try {
- this.relationships.set(new ArrayList<>(newRelationships));
- getSource().updateConnection(this);
- } catch (final RuntimeException e) {
- this.relationships.set(currentRelationships);
- throw e;
- }
- }
-
- public void setDestination(final Connectable newDestination) {
- final Connectable previousDestination = destination.get();
- if (previousDestination.equals(newDestination)) {
- return;
- }
-
- if (previousDestination.isRunning() && !(previousDestination instanceof Funnel || previousDestination instanceof LocalPort)) {
- throw new IllegalStateException("Cannot change destination of Connection because the current destination is running");
- }
-
- try {
- previousDestination.removeConnection(this);
- this.destination.set(newDestination);
- getSource().updateConnection(this);
-
- newDestination.addConnection(this);
- scheduler.registerEvent(newDestination);
- } catch (final RuntimeException e) {
- this.destination.set(previousDestination);
- throw e;
- }
- }
-
- @Override
- public void lock() {
- flowFileQueue.lock();
- }
-
- @Override
- public void unlock() {
- flowFileQueue.unlock();
- }
-
- @Override
- public List<FlowFileRecord> poll(final FlowFileFilter filter, final Set<FlowFileRecord> expiredRecords) {
- return flowFileQueue.poll(filter, expiredRecords);
- }
-
- @Override
- public boolean equals(final Object other) {
- if (!(other instanceof Connection)) {
- return false;
- }
- final Connection con = (Connection) other;
- return new EqualsBuilder().append(id, con.getIdentifier()).isEquals();
- }
-
- @Override
- public int hashCode() {
- return hashCode;
- }
-
- @Override
- public String toString() {
- return "Connection[ID=" + id + ",Name=" + name.get() + ",Source=" + getSource() + ",Destination=" + getDestination() + ",Relationships=" + getRelationships();
- }
-
- /**
- * Gives this Connection ownership of the given FlowFile and allows the
- * Connection to hold on to the FlowFile but NOT provide the FlowFile to
- * consumers. This allows us to ensure that the Connection is not deleted
- * during the middle of a Session commit.
- *
- * @param flowFile
- */
- @Override
- public void enqueue(final FlowFileRecord flowFile) {
- flowFileQueue.put(flowFile);
- }
-
- @Override
- public void enqueue(final Collection<FlowFileRecord> flowFiles) {
- flowFileQueue.putAll(flowFiles);
- }
-
- public static class Builder {
-
- private final ProcessScheduler scheduler;
-
- private String id = UUID.randomUUID().toString();
- private String name;
- private List<Position> bendPoints = new ArrayList<>();
- private ProcessGroup processGroup;
- private Connectable source;
- private Connectable destination;
- private Collection<Relationship> relationships;
-
- public Builder(final ProcessScheduler scheduler) {
- this.scheduler = scheduler;
- }
-
- public Builder id(final String id) {
- this.id = id;
- return this;
- }
-
- public Builder source(final Connectable source) {
- this.source = source;
- return this;
- }
-
- public Builder processGroup(final ProcessGroup group) {
- this.processGroup = group;
- return this;
- }
-
- public Builder destination(final Connectable destination) {
- this.destination = destination;
- return this;
- }
-
- public Builder relationships(final Collection<Relationship> relationships) {
- this.relationships = new ArrayList<>(relationships);
- return this;
- }
-
- public Builder name(final String name) {
- this.name = name;
- return this;
- }
-
- public Builder bendPoints(final List<Position> bendPoints) {
- this.bendPoints.clear();
- this.bendPoints.addAll(bendPoints);
- return this;
- }
-
- public Builder addBendPoint(final Position bendPoint) {
- bendPoints.add(bendPoint);
- return this;
- }
-
- public StandardConnection build() {
- if (source == null) {
- throw new IllegalStateException("Cannot build a Connection without a Source");
- }
- if (destination == null) {
- throw new IllegalStateException("Cannot build a Connection without a Destination");
- }
-
- if (relationships == null) {
- relationships = new ArrayList<>();
- }
-
- if (relationships.isEmpty()) {
- // ensure relationships have been specified for processors, otherwise the anonymous relationship is used
- if (source.getConnectableType() == ConnectableType.PROCESSOR) {
- throw new IllegalStateException("Cannot build a Connection without any relationships");
- }
- relationships.add(Relationship.ANONYMOUS);
- }
-
- return new StandardConnection(this);
- }
- }
-
- @Override
- public void verifyCanUpdate() {
- // StandardConnection can always be updated
- }
-
- @Override
- public void verifyCanDelete() {
- if (!flowFileQueue.isEmpty()) {
- throw new IllegalStateException("Queue not empty for " + this);
- }
-
- if (source.isRunning()) {
- if (!ConnectableType.FUNNEL.equals(source.getConnectableType())) {
- throw new IllegalStateException("Source of Connection (" + source + ") is running");
- }
- }
- }
-}