You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2015/04/22 05:39:50 UTC
[6/7] incubator-nifi git commit: NIFI-271 checkpoint
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java
index 629032a..5c4ce55 100644
--- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java
@@ -36,358 +36,383 @@ import org.apache.nifi.remote.protocol.DataPacket;
/**
* <p>
- * The SiteToSiteClient provides a mechanism for sending data to a remote instance of NiFi
- * (or NiFi cluster) and retrieving data from a remote instance of NiFi (or NiFi cluster).
+ * The SiteToSiteClient provides a mechanism for sending data to a remote
+ * instance of NiFi (or NiFi cluster) and retrieving data from a remote instance
+ * of NiFi (or NiFi cluster).
* </p>
- *
+ *
* <p>
- * When configuring the client via the {@link SiteToSiteClient.Builder}, the Builder must
- * be provided the URL of the remote NiFi instance. If the URL points to a standalone instance
- * of NiFi, all interaction will take place with that instance of NiFi. However, if the URL
- * points to the NiFi Cluster Manager of a cluster, the client will automatically handle load
- * balancing the transactions across the different nodes in the cluster.
+ * When configuring the client via the {@link SiteToSiteClient.Builder}, the
+ * Builder must be provided the URL of the remote NiFi instance. If the URL
+ * points to a standalone instance of NiFi, all interaction will take place with
+ * that instance of NiFi. However, if the URL points to the NiFi Cluster Manager
+ * of a cluster, the client will automatically handle load balancing the
+ * transactions across the different nodes in the cluster.
* </p>
- *
+ *
* <p>
- * The SiteToSiteClient provides a {@link Transaction} through which all interaction with the
- * remote instance takes place. After data has been exchanged or it is determined that no data
- * is available, the Transaction can then be canceled (via the {@link Transaction#cancel(String)}
- * method) or can be completed (via the {@link Transaction#complete(boolean)} method).
+ * The SiteToSiteClient provides a {@link Transaction} through which all
+ * interaction with the remote instance takes place. After data has been
+ * exchanged or it is determined that no data is available, the Transaction can
+ * then be canceled (via the {@link Transaction#cancel(String)} method) or can
+ * be completed (via the {@link Transaction#complete(boolean)} method).
* </p>
- *
+ *
* <p>
- * An instance of SiteToSiteClient can be obtained by constructing a new instance of the
- * {@link SiteToSiteClient.Builder} class, calling the appropriate methods to configured the
- * client as desired, and then calling the {@link SiteToSiteClient.Builder#build() build()} method.
+ * An instance of SiteToSiteClient can be obtained by constructing a new
+ * instance of the {@link SiteToSiteClient.Builder} class, calling the
+ * appropriate methods to configured the client as desired, and then calling the
+ * {@link SiteToSiteClient.Builder#build() build()} method.
* </p>
*
* <p>
- * The SiteToSiteClient itself is immutable once constructed and is thread-safe. Many threads can
- * share access to the same client. However, the {@link Transaction} that is created by the client
- * is not thread safe and should not be shared among threads.
+ * The SiteToSiteClient itself is immutable once constructed and is thread-safe.
+ * Many threads can share access to the same client. However, the
+ * {@link Transaction} that is created by the client is not thread safe and
+ * should not be shared among threads.
* </p>
*/
public interface SiteToSiteClient extends Closeable {
- /**
- * <p>
- * Creates a new Transaction that can be used to either send data to a remote NiFi instance
- * or receive data from a remote NiFi instance, depending on the value passed for the {@code direction} argument.
- * </p>
- *
- * <p>
- * <b>Note:</b> If all of the nodes are penalized (See {@link Builder#nodePenalizationPeriod(long, TimeUnit)}), then
- * this method will return <code>null</code>.
- * </p>
- *
- * @param direction specifies which direction the data should be transferred. A value of {@link TransferDirection#SEND}
- * indicates that this Transaction will send data to the remote instance; a value of {@link TransferDirection#RECEIVE} indicates
- * that this Transaction will be used to receive data from the remote instance.
- *
- * @return a Transaction to use for sending or receiving data, or <code>null</code> if all nodes are penalized.
- * @throws IOException
- */
- Transaction createTransaction(TransferDirection direction) throws HandshakeException, PortNotRunningException, ProtocolException, UnknownPortException, IOException;
-
- /**
- * <p>
- * Returns {@code true} if site-to-site communications with the remote instance are secure,
- * {@code false} if site-to-site communications with the remote instance are not secure. Whether or not
- * communications are secure depends on the server, not the client.
- * </p>
- *
- * <p>
- * In order to determine whether the server is configured for secure communications, the client may have
- * to query the server's RESTful interface. Doing so could result in an IOException.
- * </p>
- *
- * @return
- * @throws IOException if unable to query the remote instance's RESTful interface or if the remote
- * instance is not configured to allow site-to-site communications
- */
- boolean isSecure() throws IOException;
-
- /**
- * <p>
- * Returns the configuration object that was built by the Builder
- * </p>
- * @return
- */
- SiteToSiteClientConfig getConfig();
-
- /**
- * <p>
- * The Builder is the mechanism by which all configuration is passed to the SiteToSiteClient.
- * Once constructed, the SiteToSiteClient cannot be reconfigured (i.e., it is immutable). If
- * a change in configuration should be desired, the client should be {@link Closeable#close() closed}
- * and a new client created.
- * </p>
- */
- public static class Builder implements Serializable {
+ /**
+ * <p>
+ * Creates a new Transaction that can be used to either send data to a
+ * remote NiFi instance or receive data from a remote NiFi instance,
+ * depending on the value passed for the {@code direction} argument.
+ * </p>
+ *
+ * <p>
+ * <b>Note:</b> If all of the nodes are penalized (See
+ * {@link Builder#nodePenalizationPeriod(long, TimeUnit)}), then this method
+ * will return <code>null</code>.
+ * </p>
+ *
+ * @param direction specifies which direction the data should be
+ * transferred. A value of {@link TransferDirection#SEND} indicates that
+ * this Transaction will send data to the remote instance; a value of
+ * {@link TransferDirection#RECEIVE} indicates that this Transaction will be
+ * used to receive data from the remote instance.
+ *
+ * @return a Transaction to use for sending or receiving data, or
+ * <code>null</code> if all nodes are penalized.
+ * @throws org.apache.nifi.remote.exception.HandshakeException he
+ * @throws org.apache.nifi.remote.exception.PortNotRunningException pnre
+ * @throws IOException ioe
+ * @throws org.apache.nifi.remote.exception.UnknownPortException upe
+ */
+ Transaction createTransaction(TransferDirection direction) throws HandshakeException, PortNotRunningException, ProtocolException, UnknownPortException, IOException;
+
+ /**
+ * <p>
+ * In order to determine whether the server is configured for secure
+ * communications, the client may have to query the server's RESTful
+ * interface. Doing so could result in an IOException.
+ * </p>
+ *
+ * @return {@code true} if site-to-site communications with the remote
+ * instance are secure, {@code false} if site-to-site communications with
+ * the remote instance are not secure. Whether or not communications are
+ * secure depends on the server, not the client
+ * @throws IOException if unable to query the remote instance's RESTful
+ * interface or if the remote instance is not configured to allow
+ * site-to-site communications
+ */
+ boolean isSecure() throws IOException;
+
+ /**
+ *
+ * @return the configuration object that was built by the Builder
+ */
+ SiteToSiteClientConfig getConfig();
+
+ /**
+ * <p>
+ * The Builder is the mechanism by which all configuration is passed to the
+ * SiteToSiteClient. Once constructed, the SiteToSiteClient cannot be
+ * reconfigured (i.e., it is immutable). If a change in configuration should
+ * be desired, the client should be {@link Closeable#close() closed} and a
+ * new client created.
+ * </p>
+ */
+ public static class Builder implements Serializable {
+
private static final long serialVersionUID = -4954962284343090219L;
-
+
private String url;
- private long timeoutNanos = TimeUnit.SECONDS.toNanos(30);
- private long penalizationNanos = TimeUnit.SECONDS.toNanos(3);
- private long idleExpirationNanos = TimeUnit.SECONDS.toNanos(30L);
- private SSLContext sslContext;
- private EventReporter eventReporter;
- private File peerPersistenceFile;
- private boolean useCompression;
- private String portName;
- private String portIdentifier;
- private int batchCount;
- private long batchSize;
- private long batchNanos;
-
- /**
- * Populates the builder with values from the provided config
- * @param config
- * @return
- */
- public Builder fromConfig(final SiteToSiteClientConfig config) {
- this.url = config.getUrl();
- this.timeoutNanos = config.getTimeout(TimeUnit.NANOSECONDS);
- this.penalizationNanos = config.getPenalizationPeriod(TimeUnit.NANOSECONDS);
- this.idleExpirationNanos = config.getIdleConnectionExpiration(TimeUnit.NANOSECONDS);
- this.sslContext = config.getSslContext();
- this.eventReporter = config.getEventReporter();
- this.peerPersistenceFile = config.getPeerPersistenceFile();
- this.useCompression = config.isUseCompression();
- this.portName = config.getPortName();
- this.portIdentifier = config.getPortIdentifier();
- this.batchCount = config.getPreferredBatchCount();
- this.batchSize = config.getPreferredBatchSize();
- this.batchNanos = config.getPreferredBatchDuration(TimeUnit.NANOSECONDS);
-
- return this;
- }
-
- /**
- * Specifies the URL of the remote NiFi instance. If this URL points to the Cluster Manager of
- * a NiFi cluster, data transfer to and from nodes will be automatically load balanced across
- * the different nodes.
- *
- * @param url
- * @return
- */
- public Builder url(final String url) {
- this.url = url;
- return this;
- }
-
- /**
- * Specifies the communications timeouts to use when interacting with the remote instances. The
- * default value is 30 seconds.
- *
- * @param timeout
- * @param unit
- * @return
- */
- public Builder timeout(final long timeout, final TimeUnit unit) {
- this.timeoutNanos = unit.toNanos(timeout);
- return this;
- }
-
- /**
- * Specifies the amount of time that a connection can remain idle in the connection pool before it
- * is "expired" and shutdown. The default value is 30 seconds.
- *
- * @param timeout
- * @param unit
- * @return
- */
- public Builder idleExpiration(final long timeout, final TimeUnit unit) {
- this.idleExpirationNanos = unit.toNanos(timeout);
- return this;
- }
-
- /**
- * If there is a problem communicating with a node (i.e., any node in the remote NiFi cluster
- * or the remote instance of NiFi if it is standalone), specifies how long the client should
- * wait before attempting to communicate with that node again. While a particular node is penalized,
- * all other nodes in the remote cluster (if any) will still be available for communication.
- * The default value is 3 seconds.
- *
- * @param period
- * @param unit
- * @return
- */
- public Builder nodePenalizationPeriod(final long period, final TimeUnit unit) {
- this.penalizationNanos = unit.toNanos(period);
- return this;
- }
-
- /**
- * Specifies the SSL Context to use when communicating with the remote NiFi instance(s). If not
- * specified, communications will not be secure. The remote instance of NiFi always determines
- * whether or not Site-to-Site communications are secure (i.e., the client will always use
- * secure or non-secure communications, depending on what the server dictates).
- *
- * @param sslContext
- * @return
- */
- public Builder sslContext(final SSLContext sslContext) {
- this.sslContext = sslContext;
- return this;
- }
-
-
- /**
- * Provides an EventReporter that can be used by the client in order to report any events that
- * could be of interest when communicating with the remote instance. The EventReporter provided
- * must be threadsafe.
- *
- * @param eventReporter
- * @return
- */
- public Builder eventReporter(final EventReporter eventReporter) {
- this.eventReporter = eventReporter;
- return this;
- }
-
-
- /**
- * Specifies a file that the client can write to in order to persist the list of nodes in the
- * remote cluster and recover the list of nodes upon restart. This allows the client to function
- * if the remote Cluster Manager is unavailable, even after a restart of the client software.
- * If not specified, the list of nodes will not be persisted and a failure of the Cluster Manager
- * will result in not being able to communicate with the remote instance if a new client
- * is created.
- *
- * @param peerPersistenceFile
- * @return
- */
- public Builder peerPersistenceFile(final File peerPersistenceFile) {
- this.peerPersistenceFile = peerPersistenceFile;
- return this;
- }
-
- /**
- * Specifies whether or not data should be compressed before being transferred to or from the
- * remote instance.
- *
- * @param compress
- * @return
- */
- public Builder useCompression(final boolean compress) {
- this.useCompression = compress;
- return this;
- }
-
- /**
- * Specifies the name of the port to communicate with. Either the port name or the port identifier
- * must be specified.
- *
- * @param portName
- * @return
- */
- public Builder portName(final String portName) {
- this.portName = portName;
- return this;
- }
-
- /**
- * Specifies the unique identifier of the port to communicate with. If it is known, this is preferred over providing
- * the port name, as the port name may change.
- *
- * @param portIdentifier
- * @return
- */
- public Builder portIdentifier(final String portIdentifier) {
- this.portIdentifier = portIdentifier;
- return this;
- }
-
- /**
- * When pulling data from a NiFi instance, the sender chooses how large a Transaction is. However,
- * the client has the ability to request a particular batch size/duration. This method specifies
- * the preferred number of {@link DataPacket}s to include in a Transaction.
- *
- * @return
- */
- public Builder requestBatchCount(final int count) {
- this.batchCount = count;
- return this;
- }
-
- /**
- * When pulling data from a NiFi instance, the sender chooses how large a Transaction is. However,
- * the client has the ability to request a particular batch size/duration. This method specifies
- * the preferred number of bytes to include in a Transaction.
- *
- * @return
- */
- public Builder requestBatchSize(final long bytes) {
- this.batchSize = bytes;
- return this;
- }
-
+ private long timeoutNanos = TimeUnit.SECONDS.toNanos(30);
+ private long penalizationNanos = TimeUnit.SECONDS.toNanos(3);
+ private long idleExpirationNanos = TimeUnit.SECONDS.toNanos(30L);
+ private SSLContext sslContext;
+ private EventReporter eventReporter;
+ private File peerPersistenceFile;
+ private boolean useCompression;
+ private String portName;
+ private String portIdentifier;
+ private int batchCount;
+ private long batchSize;
+ private long batchNanos;
+
+ /**
+ * Populates the builder with values from the provided config
+ *
+ * @param config to start with
+ * @return the builder
+ */
+ public Builder fromConfig(final SiteToSiteClientConfig config) {
+ this.url = config.getUrl();
+ this.timeoutNanos = config.getTimeout(TimeUnit.NANOSECONDS);
+ this.penalizationNanos = config.getPenalizationPeriod(TimeUnit.NANOSECONDS);
+ this.idleExpirationNanos = config.getIdleConnectionExpiration(TimeUnit.NANOSECONDS);
+ this.sslContext = config.getSslContext();
+ this.eventReporter = config.getEventReporter();
+ this.peerPersistenceFile = config.getPeerPersistenceFile();
+ this.useCompression = config.isUseCompression();
+ this.portName = config.getPortName();
+ this.portIdentifier = config.getPortIdentifier();
+ this.batchCount = config.getPreferredBatchCount();
+ this.batchSize = config.getPreferredBatchSize();
+ this.batchNanos = config.getPreferredBatchDuration(TimeUnit.NANOSECONDS);
+
+ return this;
+ }
+
+ /**
+ * Specifies the URL of the remote NiFi instance. If this URL points to
+ * the Cluster Manager of a NiFi cluster, data transfer to and from
+ * nodes will be automatically load balanced across the different nodes.
+ *
+ * @param url url of remote instance
+ * @return the builder
+ */
+ public Builder url(final String url) {
+ this.url = url;
+ return this;
+ }
+
+ /**
+ * Specifies the communications timeouts to use when interacting with
+ * the remote instances. The default value is 30 seconds.
+ *
+ * @param timeout to use when interacting with remote instances
+ * @param unit unit of time over which to interpret the given timeout
+ * @return the builder
+ */
+ public Builder timeout(final long timeout, final TimeUnit unit) {
+ this.timeoutNanos = unit.toNanos(timeout);
+ return this;
+ }
+
+ /**
+ * Specifies the amount of time that a connection can remain idle in the
+ * connection pool before it is "expired" and shutdown. The default
+ * value is 30 seconds.
+ *
+ * @param timeout to use when interacting with remote instances
+ * @param unit unit of time over which to interpret the given timeout
+ * @return the builder
+ */
+ public Builder idleExpiration(final long timeout, final TimeUnit unit) {
+ this.idleExpirationNanos = unit.toNanos(timeout);
+ return this;
+ }
+
+ /**
+ * If there is a problem communicating with a node (i.e., any node in
+ * the remote NiFi cluster or the remote instance of NiFi if it is
+ * standalone), specifies how long the client should wait before
+ * attempting to communicate with that node again. While a particular
+ * node is penalized, all other nodes in the remote cluster (if any)
+ * will still be available for communication. The default value is 3
+ * seconds.
+ *
+ * @param period time to wait between communication attempts
+ * @param unit over which to evaluate the given period
+ * @return the builder
+ */
+ public Builder nodePenalizationPeriod(final long period, final TimeUnit unit) {
+ this.penalizationNanos = unit.toNanos(period);
+ return this;
+ }
+
/**
- * When pulling data from a NiFi instance, the sender chooses how large a Transaction is. However,
- * the client has the ability to request a particular batch size/duration. This method specifies
- * the preferred amount of time that a Transaction should span.
- *
- * @return
+ * Specifies the SSL Context to use when communicating with the remote
+ * NiFi instance(s). If not specified, communications will not be
+ * secure. The remote instance of NiFi always determines whether or not
+ * Site-to-Site communications are secure (i.e., the client will always
+ * use secure or non-secure communications, depending on what the server
+ * dictates).
+ *
+ * @param sslContext the context
+ * @return the builder
*/
- public Builder requestBatchDuration(final long value, final TimeUnit unit) {
- this.batchNanos = unit.toNanos(value);
- return this;
- }
-
- /**
- * Returns a {@link SiteToSiteClientConfig} for the configured values but does not create a SiteToSiteClient
- * @return
- */
- public SiteToSiteClientConfig buildConfig() {
- final SiteToSiteClientConfig config = new SiteToSiteClientConfig() {
+ public Builder sslContext(final SSLContext sslContext) {
+ this.sslContext = sslContext;
+ return this;
+ }
+
+ /**
+ * Provides an EventReporter that can be used by the client in order to
+ * report any events that could be of interest when communicating with
+ * the remote instance. The EventReporter provided must be threadsafe.
+ *
+ * @param eventReporter reporter
+ * @return the builder
+ */
+ public Builder eventReporter(final EventReporter eventReporter) {
+ this.eventReporter = eventReporter;
+ return this;
+ }
+
+ /**
+ * Specifies a file that the client can write to in order to persist the
+ * list of nodes in the remote cluster and recover the list of nodes
+ * upon restart. This allows the client to function if the remote
+ * Cluster Manager is unavailable, even after a restart of the client
+ * software. If not specified, the list of nodes will not be persisted
+ * and a failure of the Cluster Manager will result in not being able to
+ * communicate with the remote instance if a new client is created.
+ *
+ * @param peerPersistenceFile file
+ * @return the builder
+ */
+ public Builder peerPersistenceFile(final File peerPersistenceFile) {
+ this.peerPersistenceFile = peerPersistenceFile;
+ return this;
+ }
+
+ /**
+ * Specifies whether or not data should be compressed before being
+ * transferred to or from the remote instance.
+ *
+ * @param compress true if should compress
+ * @return the builder
+ */
+ public Builder useCompression(final boolean compress) {
+ this.useCompression = compress;
+ return this;
+ }
+
+ /**
+ * Specifies the name of the port to communicate with. Either the port
+ * name or the port identifier must be specified.
+ *
+ * @param portName name of port
+ * @return the builder
+ */
+ public Builder portName(final String portName) {
+ this.portName = portName;
+ return this;
+ }
+
+ /**
+ * Specifies the unique identifier of the port to communicate with. If
+ * it is known, this is preferred over providing the port name, as the
+ * port name may change.
+ *
+ * @param portIdentifier identifier of port
+ * @return the builder
+ */
+ public Builder portIdentifier(final String portIdentifier) {
+ this.portIdentifier = portIdentifier;
+ return this;
+ }
+
+ /**
+ * When pulling data from a NiFi instance, the sender chooses how large
+ * a Transaction is. However, the client has the ability to request a
+ * particular batch size/duration. This method specifies the preferred
+ * number of {@link DataPacket}s to include in a Transaction.
+ *
+ * @param count client preferred batch size
+ * @return the builder
+ */
+ public Builder requestBatchCount(final int count) {
+ this.batchCount = count;
+ return this;
+ }
+
+ /**
+ * When pulling data from a NiFi instance, the sender chooses how large
+ * a Transaction is. However, the client has the ability to request a
+ * particular batch size/duration. This method specifies the preferred
+ * number of bytes to include in a Transaction.
+ *
+ * @param bytes client preferred batch size
+ * @return the builder
+ */
+ public Builder requestBatchSize(final long bytes) {
+ this.batchSize = bytes;
+ return this;
+ }
+
+ /**
+ * When pulling data from a NiFi instance, the sender chooses how large
+ * a Transaction is. However, the client has the ability to request a
+ * particular batch size/duration. This method specifies the preferred
+ * amount of time that a Transaction should span.
+ *
+ * @param value client preferred batch duration
+ * @param unit client preferred batch duration unit
+ * @return the builder
+ */
+ public Builder requestBatchDuration(final long value, final TimeUnit unit) {
+ this.batchNanos = unit.toNanos(value);
+ return this;
+ }
+
+ /**
+ * @return a {@link SiteToSiteClientConfig} for the configured values
+ * but does not create a SiteToSiteClient
+ */
+ public SiteToSiteClientConfig buildConfig() {
+ final SiteToSiteClientConfig config = new SiteToSiteClientConfig() {
private static final long serialVersionUID = 1323119754841633818L;
@Override
public boolean isUseCompression() {
return Builder.this.isUseCompression();
}
-
+
@Override
public String getUrl() {
return Builder.this.getUrl();
}
-
+
@Override
public long getTimeout(final TimeUnit timeUnit) {
return Builder.this.getTimeout(timeUnit);
}
-
+
@Override
public long getIdleConnectionExpiration(final TimeUnit timeUnit) {
return Builder.this.getIdleConnectionExpiration(timeUnit);
}
-
+
@Override
public SSLContext getSslContext() {
return Builder.this.getSslContext();
}
-
+
@Override
public String getPortName() {
return Builder.this.getPortName();
}
-
+
@Override
public String getPortIdentifier() {
return Builder.this.getPortIdentifier();
}
-
+
@Override
public long getPenalizationPeriod(final TimeUnit timeUnit) {
return Builder.this.getPenalizationPeriod(timeUnit);
}
-
+
@Override
public File getPeerPersistenceFile() {
return Builder.this.getPeerPersistenceFile();
}
-
+
@Override
public EventReporter getEventReporter() {
return Builder.this.getEventReporter();
@@ -397,123 +422,117 @@ public interface SiteToSiteClient extends Closeable {
public long getPreferredBatchDuration(final TimeUnit timeUnit) {
return timeUnit.convert(Builder.this.batchNanos, TimeUnit.NANOSECONDS);
}
-
+
@Override
public long getPreferredBatchSize() {
return Builder.this.batchSize;
}
-
+
@Override
public int getPreferredBatchCount() {
return Builder.this.batchCount;
}
};
-
+
return config;
- }
-
- /**
- * Builds a new SiteToSiteClient that can be used to send and receive data with remote instances of NiFi
- * @return
- *
- * @throws IllegalStateException if either the url is not set or neither the port name nor port identifier
- * is set.
- */
- public SiteToSiteClient build() {
- if ( url == null ) {
- throw new IllegalStateException("Must specify URL to build Site-to-Site client");
- }
-
- if ( portName == null && portIdentifier == null ) {
- throw new IllegalStateException("Must specify either Port Name or Port Identifier to builder Site-to-Site client");
- }
-
- return new SocketClient(buildConfig());
- }
-
- /**
- * Returns the configured URL for the remote NiFi instance
- * @return
- */
- public String getUrl() {
- return url;
- }
-
- /**
- * Returns the communications timeout
- * @return
- */
- public long getTimeout(final TimeUnit timeUnit) {
- return timeUnit.convert(timeoutNanos, TimeUnit.NANOSECONDS);
- }
-
- /**
- * Returns the amount of of time that a connection can remain idle in the connection
- * pool before being shutdown
- * @param timeUnit
- * @return
- */
- public long getIdleConnectionExpiration(final TimeUnit timeUnit) {
- return timeUnit.convert(idleExpirationNanos, TimeUnit.NANOSECONDS);
- }
-
- /**
- * Returns the amount of time that a particular node will be ignored after a
- * communications error with that node occurs
- * @param timeUnit
- * @return
- */
- public long getPenalizationPeriod(TimeUnit timeUnit) {
- return timeUnit.convert(penalizationNanos, TimeUnit.NANOSECONDS);
- }
-
- /**
- * Returns the SSL Context that is configured for this builder
- * @return
- */
- public SSLContext getSslContext() {
- return sslContext;
- }
-
- /**
- * Returns the EventReporter that is to be used by clients to report events
- * @return
- */
- public EventReporter getEventReporter() {
- return eventReporter;
- }
-
- /**
- * Returns the file that is to be used for persisting the nodes of a remote cluster, if any.
- * @return
- */
- public File getPeerPersistenceFile() {
- return peerPersistenceFile;
- }
-
- /**
- * Returns a boolean indicating whether or not compression will be used to transfer data
- * to and from the remote instance
- * @return
- */
- public boolean isUseCompression() {
- return useCompression;
- }
-
- /**
- * Returns the name of the port that the client is to communicate with.
- * @return
- */
- public String getPortName() {
- return portName;
- }
-
- /**
- * Returns the identifier of the port that the client is to communicate with.
- * @return
- */
- public String getPortIdentifier() {
- return portIdentifier;
- }
- }
+ }
+
+ /**
+ * @return a new SiteToSiteClient that can be used to send and receive
+ * data with remote instances of NiFi
+ *
+ * @throws IllegalStateException if either the url is not set or neither
+ * the port name nor port identifier is set.
+ */
+ public SiteToSiteClient build() {
+ if (url == null) {
+ throw new IllegalStateException("Must specify URL to build Site-to-Site client");
+ }
+
+ if (portName == null && portIdentifier == null) {
+ throw new IllegalStateException("Must specify either Port Name or Port Identifier to builder Site-to-Site client");
+ }
+
+ return new SocketClient(buildConfig());
+ }
+
+ /**
+ * @return the configured URL for the remote NiFi instance
+ */
+ public String getUrl() {
+ return url;
+ }
+
+ /**
+ * @param timeUnit unit over which to interpret the timeout
+ * @return the communications timeout
+ */
+ public long getTimeout(final TimeUnit timeUnit) {
+ return timeUnit.convert(timeoutNanos, TimeUnit.NANOSECONDS);
+ }
+
+ /**
+ * @param timeUnit unit over which to interpret the time
+ * @return the amount of of time that a connection can remain idle in
+ * the connection pool before being shutdown
+ */
+ public long getIdleConnectionExpiration(final TimeUnit timeUnit) {
+ return timeUnit.convert(idleExpirationNanos, TimeUnit.NANOSECONDS);
+ }
+
+ /**
+ * @param timeUnit unit of reported time
+ * @return the amount of time that a particular node will be ignored
+ * after a communications error with that node occurs
+ */
+ public long getPenalizationPeriod(TimeUnit timeUnit) {
+ return timeUnit.convert(penalizationNanos, TimeUnit.NANOSECONDS);
+ }
+
+ /**
+ * @return the SSL Context that is configured for this builder
+ */
+ public SSLContext getSslContext() {
+ return sslContext;
+ }
+
+ /**
+ * @return the EventReporter that is to be used by clients to report
+ * events
+ */
+ public EventReporter getEventReporter() {
+ return eventReporter;
+ }
+
+ /**
+ * @return the file that is to be used for persisting the nodes of a
+ * remote cluster, if any
+ */
+ public File getPeerPersistenceFile() {
+ return peerPersistenceFile;
+ }
+
+ /**
+ * @return a boolean indicating whether or not compression will be used
+ * to transfer data to and from the remote instance
+ */
+ public boolean isUseCompression() {
+ return useCompression;
+ }
+
+ /**
+ * @return the name of the port that the client is to communicate with
+ */
+ public String getPortName() {
+ return portName;
+ }
+
+ /**
+ * @return the identifier of the port that the client is to communicate
+ * with
+ */
+ public String getPortIdentifier() {
+ return portIdentifier;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClientConfig.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClientConfig.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClientConfig.java
index 5e7fbe8..c4b0d22 100644
--- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClientConfig.java
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClientConfig.java
@@ -27,97 +27,91 @@ import org.apache.nifi.remote.protocol.DataPacket;
public interface SiteToSiteClientConfig extends Serializable {
- /**
- * Returns the configured URL for the remote NiFi instance
- * @return
- */
- String getUrl();
-
- /**
- * Returns the communications timeout in nanoseconds
- * @return
- */
- long getTimeout(final TimeUnit timeUnit);
-
- /**
- * Returns the amount of time that a connection can remain idle before it is
- * "expired" and shut down
- * @param timeUnit
- * @return
- */
- long getIdleConnectionExpiration(TimeUnit timeUnit);
-
- /**
- * Returns the amount of time that a particular node will be ignored after a
- * communications error with that node occurs
- * @param timeUnit
- * @return
- */
- long getPenalizationPeriod(TimeUnit timeUnit);
-
- /**
- * Returns the SSL Context that is configured for this builder
- * @return
- */
- SSLContext getSslContext();
-
- /**
- * Returns the file that is to be used for persisting the nodes of a remote cluster, if any.
- * @return
- */
- File getPeerPersistenceFile();
-
- /**
- * Returns a boolean indicating whether or not compression will be used to transfer data
- * to and from the remote instance
- * @return
- */
- boolean isUseCompression();
-
- /**
- * Returns the name of the port that the client is to communicate with.
- * @return
- */
- String getPortName();
-
- /**
- * Returns the identifier of the port that the client is to communicate with.
- * @return
- */
- String getPortIdentifier();
-
- /**
- * When pulling data from a NiFi instance, the sender chooses how large a Transaction is. However,
- * the client has the ability to request a particular batch size/duration. This returns the maximum
- * amount of time that we will request a NiFi instance to send data to us in a Transaction.
- *
- * @param timeUnit
- * @return
- */
- long getPreferredBatchDuration(TimeUnit timeUnit);
-
/**
- * When pulling data from a NiFi instance, the sender chooses how large a Transaction is. However,
- * the client has the ability to request a particular batch size/duration. This returns the maximum
- * number of bytes that we will request a NiFi instance to send data to us in a Transaction.
- *
- * @return
+ * @return the configured URL for the remote NiFi instance
*/
- long getPreferredBatchSize();
-
-
- /**
- * When pulling data from a NiFi instance, the sender chooses how large a Transaction is. However,
- * the client has the ability to request a particular batch size/duration. This returns the maximum
- * number of {@link DataPacket}s that we will request a NiFi instance to send data to us in a Transaction.
- *
- * @return
+ String getUrl();
+
+ /**
+ * @param timeUnit unit over which to report the timeout
+ * @return the communications timeout in given unit
+ */
+ long getTimeout(final TimeUnit timeUnit);
+
+ /**
+ * @param timeUnit the unit for which to report the time
+ * @return the amount of time that a connection can remain idle before it is
+ * "expired" and shut down
+ */
+ long getIdleConnectionExpiration(TimeUnit timeUnit);
+
+ /**
+ * @param timeUnit unit over which to report the time
+ * @return the amount of time that a particular node will be ignored after a
+ * communications error with that node occurs
+ */
+ long getPenalizationPeriod(TimeUnit timeUnit);
+
+ /**
+ * @return the SSL Context that is configured for this builder
+ */
+ SSLContext getSslContext();
+
+ /**
+ * @return the file that is to be used for persisting the nodes of a remote
+ * cluster, if any
+ */
+ File getPeerPersistenceFile();
+
+ /**
+ * @return a boolean indicating whether or not compression will be used to
+ * transfer data to and from the remote instance
*/
- int getPreferredBatchCount();
-
- /**
- * Returns the EventReporter that is to be used by clients to report events
- * @return
+ boolean isUseCompression();
+
+ /**
+ * @return the name of the port that the client is to communicate with
+ */
+ String getPortName();
+
+ /**
+ * @return the identifier of the port that the client is to communicate with
+ */
+ String getPortIdentifier();
+
+ /**
+ * When pulling data from a NiFi instance, the sender chooses how large a
+ * Transaction is. However, the client has the ability to request a
+ * particular batch size/duration.
+ *
+ * @param timeUnit unit of time over which to report the duration
+ * @return the maximum amount of time that we will request a NiFi instance
+ * to send data to us in a Transaction
+ */
+ long getPreferredBatchDuration(TimeUnit timeUnit);
+
+ /**
+ * When pulling data from a NiFi instance, the sender chooses how large a
+ * Transaction is. However, the client has the ability to request a
+ * particular batch size/duration.
+ *
+ * @return returns the maximum number of bytes that we will request a NiFi
+ * instance to send data to us in a Transaction
+ */
+ long getPreferredBatchSize();
+
+ /**
+ * When pulling data from a NiFi instance, the sender chooses how large a
+ * Transaction is. However, the client has the ability to request a
+ * particular batch size/duration.
+ *
+ * @return the maximum number of {@link DataPacket}s that we will request a
+ * NiFi instance to send data to us in a Transaction
+ */
+ int getPreferredBatchCount();
+
+ /**
+ * @return the EventReporter that is to be used by clients to report events
*/
EventReporter getEventReporter();
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnection.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnection.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnection.java
index 651ae50..1a16b02 100644
--- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnection.java
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnection.java
@@ -21,33 +21,34 @@ import org.apache.nifi.remote.codec.FlowFileCodec;
import org.apache.nifi.remote.protocol.socket.SocketClientProtocol;
public class EndpointConnection {
- private final Peer peer;
+
+ private final Peer peer;
private final SocketClientProtocol socketClientProtocol;
private final FlowFileCodec codec;
private volatile long lastUsed;
-
+
public EndpointConnection(final Peer peer, final SocketClientProtocol socketClientProtocol, final FlowFileCodec codec) {
this.peer = peer;
this.socketClientProtocol = socketClientProtocol;
this.codec = codec;
}
-
+
public FlowFileCodec getCodec() {
return codec;
}
-
+
public SocketClientProtocol getSocketClientProtocol() {
return socketClientProtocol;
}
-
+
public Peer getPeer() {
return peer;
}
-
+
public void setLastTimeUsed() {
lastUsed = System.currentTimeMillis();
}
-
+
public long getLastTimeUsed() {
return lastUsed;
}