You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by al...@apache.org on 2015/04/22 23:13:19 UTC
[29/49] incubator-nifi git commit: NIFI-271 checkpoint
NIFI-271 checkpoint
Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/b612b6bc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/b612b6bc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/b612b6bc
Branch: refs/heads/develop
Commit: b612b6bcd9f2f29a4466360d5fbbeccee62ae650
Parents: afb4fe5
Author: joewitt <jo...@apache.org>
Authored: Tue Apr 21 23:39:31 2015 -0400
Committer: joewitt <jo...@apache.org>
Committed: Tue Apr 21 23:39:31 2015 -0400
----------------------------------------------------------------------
.../nifi/processor/util/FlowFileFilters.java | 2 +-
.../nifi/processor/util/SSLProperties.java | 8 +-
.../nifi/processor/util/StandardValidators.java | 140 +++-
.../processor/util/TestStandardValidators.java | 18 +-
.../nifi-site-to-site-client/pom.xml | 64 +-
.../remote/AbstractCommunicationsSession.java | 9 +-
.../org/apache/nifi/remote/Communicant.java | 23 +-
.../main/java/org/apache/nifi/remote/Peer.java | 20 +-
.../org/apache/nifi/remote/PeerDescription.java | 7 +-
.../java/org/apache/nifi/remote/PeerStatus.java | 7 +-
.../nifi/remote/RemoteResourceInitiator.java | 48 +-
.../org/apache/nifi/remote/Transaction.java | 295 +++----
.../nifi/remote/TransactionCompletion.java | 49 +-
.../apache/nifi/remote/TransferDirection.java | 13 +-
.../nifi/remote/client/SiteToSiteClient.java | 831 ++++++++++---------
.../remote/client/SiteToSiteClientConfig.java | 170 ++--
.../client/socket/EndpointConnection.java | 15 +-
.../client/socket/EndpointConnectionPool.java | 627 +++++++-------
.../nifi/remote/client/socket/SocketClient.java | 319 ++++---
.../apache/nifi/remote/codec/FlowFileCodec.java | 15 +-
.../remote/codec/StandardFlowFileCodec.java | 33 +-
.../remote/exception/HandshakeException.java | 9 +-
.../exception/PortNotRunningException.java | 7 +-
.../remote/exception/ProtocolException.java | 4 +-
.../remote/exception/UnknownPortException.java | 6 +-
.../SocketChannelCommunicationsSession.java | 23 +-
.../remote/io/socket/SocketChannelInput.java | 11 +-
.../remote/io/socket/SocketChannelOutput.java | 11 +-
.../SSLSocketChannelCommunicationsSession.java | 23 +-
.../io/socket/ssl/SSLSocketChannelInput.java | 9 +-
.../io/socket/ssl/SSLSocketChannelOutput.java | 5 +-
.../nifi/remote/protocol/ClientProtocol.java | 18 +-
.../remote/protocol/CommunicationsInput.java | 5 +-
.../remote/protocol/CommunicationsSession.java | 6 +-
.../apache/nifi/remote/protocol/DataPacket.java | 33 +-
.../protocol/socket/HandshakeProperty.java | 38 +-
.../nifi/remote/protocol/socket/Response.java | 11 +-
.../remote/protocol/socket/ResponseCode.java | 65 +-
.../protocol/socket/SocketClientProtocol.java | 388 ++++-----
.../socket/SocketClientTransaction.java | 468 +++++------
.../SocketClientTransactionCompletion.java | 2 +-
.../nifi/remote/util/NiFiRestApiUtil.java | 24 +-
.../nifi/remote/util/PeerStatusCache.java | 3 +-
.../nifi/remote/util/StandardDataPacket.java | 44 +-
.../socket/TestEndpointConnectionStatePool.java | 17 +-
.../client/socket/TestSiteToSiteClient.java | 42 +-
.../nifi/io/nio/AbstractChannelReader.java | 10 +-
.../java/org/apache/nifi/io/nio/BufferPool.java | 10 +-
.../apache/nifi/io/nio/ChannelDispatcher.java | 14 +-
.../org/apache/nifi/io/nio/ChannelListener.java | 7 +-
.../nifi/io/nio/DatagramChannelReader.java | 12 +-
.../apache/nifi/io/nio/SocketChannelReader.java | 12 +-
.../nifi/io/nio/consumer/StreamConsumer.java | 12 +-
.../nifi/io/socket/SSLContextFactory.java | 12 +-
.../io/socket/ServerSocketConfiguration.java | 6 +-
.../nifi/io/socket/SocketConfiguration.java | 6 +-
.../org/apache/nifi/io/socket/SocketUtils.java | 6 +-
.../io/socket/multicast/MulticastListener.java | 5 +-
.../apache/nifi/io/nio/example/TCPClient.java | 3 +-
.../org/wali/MinimalLockingWriteAheadLog.java | 70 +-
.../src/main/java/org/wali/SerDe.java | 40 +-
.../java/org/wali/WriteAheadRepository.java | 16 +-
...kControllerServiceInitializationContext.java | 8 +-
.../nifi/util/MockControllerServiceLookup.java | 14 +-
.../java/org/apache/nifi/util/MockFlowFile.java | 20 +-
.../apache/nifi/util/MockProcessContext.java | 15 +-
.../apache/nifi/util/MockProcessSession.java | 33 +-
.../MockProcessorInitializationContext.java | 4 +-
.../org/apache/nifi/util/MockProcessorLog.java | 124 ---
.../nifi/util/MockProvenanceReporter.java | 4 +-
.../apache/nifi/util/MockReportingContext.java | 6 +-
.../MockReportingInitializationContext.java | 2 +-
.../apache/nifi/util/MockValidationContext.java | 20 +-
.../org/apache/nifi/util/ReflectionUtils.java | 21 +-
.../apache/nifi/util/SharedSessionState.java | 1 -
.../nifi/util/StandardProcessorTestRunner.java | 123 ++-
.../java/org/apache/nifi/util/TestRunner.java | 428 +++++-----
.../util/TestStandardProcessorTestRunner.java | 30 +-
.../apache/nifi/documentation/DocGenerator.java | 12 +-
.../html/HtmlDocumentationWriter.java | 231 ++----
.../html/HtmlProcessorDocumentationWriter.java | 77 +-
.../FullyDocumentedControllerService.java | 51 +-
.../example/FullyDocumentedProcessor.java | 156 ++--
.../example/FullyDocumentedReportingTask.java | 32 +-
.../documentation/example/NakedProcessor.java | 8 +-
.../documentation/example/SampleService.java | 2 +-
.../html/HtmlDocumentationWriterTest.java | 94 +--
.../html/ProcessorDocumentationWriterTest.java | 135 ++-
.../nifi/documentation/html/XmlValidator.java | 29 +-
.../org/apache/nifi/nar/ExtensionManager.java | 14 +-
.../org/apache/nifi/nar/NarClassLoader.java | 24 +-
.../org/apache/nifi/nar/NarClassLoaders.java | 40 +-
.../java/org/apache/nifi/nar/NarUnpacker.java | 14 +-
.../java/org/apache/nifi/util/FileUtils.java | 28 +-
94 files changed, 2944 insertions(+), 3082 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/FlowFileFilters.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/FlowFileFilters.java b/nifi/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/FlowFileFilters.java
index 1f77093..2d1a407 100644
--- a/nifi/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/FlowFileFilters.java
+++ b/nifi/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/FlowFileFilters.java
@@ -32,7 +32,7 @@ public class FlowFileFilters {
* @param maxSize the maximum size of the group of FlowFiles
* @param unit the unit of the <code>maxSize</code> argument
* @param maxCount the maximum number of FlowFiles to pull
- * @return
+ * @return filter
*/
public static FlowFileFilter newSizeBasedFilter(final double maxSize, final DataUnit unit, final int maxCount) {
final double maxBytes = DataUnit.B.convert(maxSize, unit);
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/SSLProperties.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/SSLProperties.java b/nifi/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/SSLProperties.java
index 0d66df5..87d63de 100644
--- a/nifi/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/SSLProperties.java
+++ b/nifi/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/SSLProperties.java
@@ -130,7 +130,13 @@ public class SSLProperties {
try {
final boolean storeValid = CertificateUtils.isStoreValid(file.toURI().toURL(), KeystoreType.valueOf(type), password.toCharArray());
if (!storeValid) {
- results.add(new ValidationResult.Builder().subject(keystoreDesc + " Properties").valid(false).explanation("Invalid KeyStore Password or Type specified for file " + filename).build());
+ results.add(
+ new ValidationResult.Builder()
+ .subject(keystoreDesc + " Properties")
+ .valid(false)
+ .explanation("Invalid KeyStore Password or Type specified for file " + filename)
+ .build()
+ );
}
} catch (MalformedURLException e) {
results.add(new ValidationResult.Builder().subject(keystoreDesc + " Properties").valid(false).explanation("Malformed URL from file: " + e).build());
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/StandardValidators.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/StandardValidators.java b/nifi/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/StandardValidators.java
index d1621ed..37ba7d8 100644
--- a/nifi/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/StandardValidators.java
+++ b/nifi/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/StandardValidators.java
@@ -46,10 +46,10 @@ public class StandardValidators {
public ValidationResult validate(final String subject, final String input, final ValidationContext context) {
final ValidationResult.Builder builder = new ValidationResult.Builder();
builder.subject(subject).input(input);
- if ( context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(input) ) {
+ if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(input)) {
return builder.valid(true).explanation("Contains Expression Language").build();
}
-
+
try {
FlowFile.KeyValidator.validateKey(input);
builder.valid(true);
@@ -66,10 +66,10 @@ public class StandardValidators {
public ValidationResult validate(final String subject, final String input, final ValidationContext context) {
final ValidationResult.Builder builder = new ValidationResult.Builder();
builder.subject("Property Name").input(subject);
- if ( context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(input) ) {
+ if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(input)) {
return builder.valid(true).explanation("Contains Expression Language").build();
}
-
+
try {
FlowFile.KeyValidator.validateKey(subject);
builder.valid(true);
@@ -84,10 +84,10 @@ public class StandardValidators {
public static final Validator POSITIVE_INTEGER_VALIDATOR = new Validator() {
@Override
public ValidationResult validate(final String subject, final String value, final ValidationContext context) {
- if ( context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(value) ) {
+ if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(value)) {
return new ValidationResult.Builder().subject(subject).input(value).explanation("Expression Language Present").valid(true).build();
}
-
+
String reason = null;
try {
final int intVal = Integer.parseInt(value);
@@ -106,7 +106,7 @@ public class StandardValidators {
public static final Validator POSITIVE_LONG_VALIDATOR = new Validator() {
@Override
public ValidationResult validate(final String subject, final String value, final ValidationContext context) {
- if ( context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(value) ) {
+ if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(value)) {
return new ValidationResult.Builder().subject(subject).input(value).explanation("Expression Language Present").valid(true).build();
}
@@ -137,7 +137,7 @@ public class StandardValidators {
public static final Validator BOOLEAN_VALIDATOR = new Validator() {
@Override
public ValidationResult validate(final String subject, final String value, final ValidationContext context) {
- if ( context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(value) ) {
+ if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(value)) {
return new ValidationResult.Builder().subject(subject).input(value).explanation("Expression Language Present").valid(true).build();
}
@@ -150,7 +150,7 @@ public class StandardValidators {
public static final Validator INTEGER_VALIDATOR = new Validator() {
@Override
public ValidationResult validate(final String subject, final String value, final ValidationContext context) {
- if ( context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(value) ) {
+ if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(value)) {
return new ValidationResult.Builder().subject(subject).input(value).explanation("Expression Language Present").valid(true).build();
}
@@ -168,7 +168,7 @@ public class StandardValidators {
public static final Validator LONG_VALIDATOR = new Validator() {
@Override
public ValidationResult validate(final String subject, final String value, final ValidationContext context) {
- if ( context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(value) ) {
+ if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(value)) {
return new ValidationResult.Builder().subject(subject).input(value).explanation("Expression Language Present").valid(true).build();
}
@@ -186,7 +186,7 @@ public class StandardValidators {
public static final Validator NON_NEGATIVE_INTEGER_VALIDATOR = new Validator() {
@Override
public ValidationResult validate(final String subject, final String value, final ValidationContext context) {
- if ( context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(value) ) {
+ if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(value)) {
return new ValidationResult.Builder().subject(subject).input(value).explanation("Expression Language Present").valid(true).build();
}
@@ -208,7 +208,7 @@ public class StandardValidators {
public static final Validator CHARACTER_SET_VALIDATOR = new Validator() {
@Override
public ValidationResult validate(final String subject, final String value, final ValidationContext context) {
- if ( context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(value) ) {
+ if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(value)) {
return new ValidationResult.Builder().subject(subject).input(value).explanation("Expression Language Present").valid(true).build();
}
@@ -235,7 +235,7 @@ public class StandardValidators {
public static final Validator URI_VALIDATOR = new Validator() {
@Override
public ValidationResult validate(final String subject, final String input, final ValidationContext context) {
- if ( context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(input) ) {
+ if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(input)) {
return new ValidationResult.Builder().subject(subject).input(input).explanation("Expression Language Present").valid(true).build();
}
@@ -253,7 +253,7 @@ public class StandardValidators {
public static final Validator ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR = new Validator() {
@Override
public ValidationResult validate(final String subject, final String input, final ValidationContext context) {
- if ( context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(input) ) {
+ if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(input)) {
return new ValidationResult.Builder().subject(subject).input(input).explanation("Expression Language Present").valid(true).build();
}
@@ -270,7 +270,7 @@ public class StandardValidators {
public static final Validator TIME_PERIOD_VALIDATOR = new Validator() {
@Override
public ValidationResult validate(final String subject, final String input, final ValidationContext context) {
- if ( context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(input) ) {
+ if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(input)) {
return new ValidationResult.Builder().subject(subject).input(input).explanation("Expression Language Present").valid(true).build();
}
@@ -280,7 +280,14 @@ public class StandardValidators {
if (Pattern.compile(FormatUtils.TIME_DURATION_REGEX).matcher(input.toLowerCase()).matches()) {
return new ValidationResult.Builder().subject(subject).input(input).valid(true).build();
} else {
- return new ValidationResult.Builder().subject(subject).input(input).valid(false).explanation("Must be of format <duration> <TimeUnit> where <duration> is a non-negative integer and TimeUnit is a supported Time Unit, such as: nanos, millis, secs, mins, hrs, days").build();
+ return new ValidationResult.Builder()
+ .subject(subject)
+ .input(input)
+ .valid(false)
+ .explanation("Must be of format <duration> <TimeUnit> where <duration> is a "
+ + "non-negative integer and TimeUnit is a supported Time Unit, such "
+ + "as: nanos, millis, secs, mins, hrs, days")
+ .build();
}
}
};
@@ -288,17 +295,28 @@ public class StandardValidators {
public static final Validator DATA_SIZE_VALIDATOR = new Validator() {
@Override
public ValidationResult validate(final String subject, final String input, final ValidationContext context) {
- if ( context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(input) ) {
+ if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(input)) {
return new ValidationResult.Builder().subject(subject).input(input).explanation("Expression Language Present").valid(true).build();
}
if (input == null) {
- return new ValidationResult.Builder().subject(subject).input(input).valid(false).explanation("Data Size cannot be null").build();
+ return new ValidationResult.Builder()
+ .subject(subject)
+ .input(input)
+ .valid(false)
+ .explanation("Data Size cannot be null")
+ .build();
}
if (Pattern.compile(DataUnit.DATA_SIZE_REGEX).matcher(input.toUpperCase()).matches()) {
return new ValidationResult.Builder().subject(subject).input(input).valid(true).build();
} else {
- return new ValidationResult.Builder().subject(subject).input(input).valid(false).explanation("Must be of format <Data Size> <Data Unit> where <Data Size> is a non-negative integer and <Data Unit> is a supported Data Unit, such as: B, KB, MB, GB, TB").build();
+ return new ValidationResult.Builder()
+ .subject(subject).input(input)
+ .valid(false)
+ .explanation("Must be of format <Data Size> <Data Unit> where <Data Size>"
+ + " is a non-negative integer and <Data Unit> is a supported Data"
+ + " Unit, such as: B, KB, MB, GB, TB")
+ .build();
}
}
};
@@ -318,7 +336,7 @@ public class StandardValidators {
return new Validator() {
@Override
public ValidationResult validate(final String subject, final String input, final ValidationContext context) {
- if ( context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(input) ) {
+ if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(input)) {
return new ValidationResult.Builder().subject(subject).input(input).explanation("Expression Language Present").valid(true).build();
}
@@ -346,19 +364,19 @@ public class StandardValidators {
@Override
public ValidationResult validate(final String subject, final String input, final ValidationContext context) {
- if ( context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(input) ) {
+ if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(input)) {
return new ValidationResult.Builder().subject(subject).input(input).explanation("Expression Language Present").valid(true).build();
}
final ValidationResult vr = DATA_SIZE_VALIDATOR.validate(subject, input, context);
- if(!vr.isValid()){
+ if (!vr.isValid()) {
return vr;
}
final long dataSizeBytes = DataUnit.parseDataSize(input, DataUnit.B).longValue();
- if(dataSizeBytes < minBytesInclusive){
+ if (dataSizeBytes < minBytesInclusive) {
return new ValidationResult.Builder().subject(subject).input(input).valid(false).explanation("Cannot be smaller than " + minBytesInclusive + " bytes").build();
}
- if(dataSizeBytes > maxBytesInclusive){
+ if (dataSizeBytes > maxBytesInclusive) {
return new ValidationResult.Builder().subject(subject).input(input).valid(false).explanation("Cannot be larger than " + maxBytesInclusive + " bytes").build();
}
return new ValidationResult.Builder().subject(subject).input(input).valid(true).build();
@@ -371,7 +389,7 @@ public class StandardValidators {
return new Validator() {
@Override
public ValidationResult validate(final String subject, final String input, final ValidationContext context) {
- if ( context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(input) ) {
+ if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(input)) {
return new ValidationResult.Builder().subject(subject).input(input).explanation("Expression Language Present").valid(true).build();
}
@@ -397,10 +415,11 @@ public class StandardValidators {
* Language will not support FlowFile Attributes but only System/JVM
* Properties
*
- * @param minCapturingGroups
- * @param maxCapturingGroups
- * @param supportAttributeExpressionLanguage
- * @return
+ * @param minCapturingGroups minimum capturing groups allowed
+ * @param maxCapturingGroups maximum capturing groups allowed
+ * @param supportAttributeExpressionLanguage whether or not to support
+ * expression language
+ * @return validator
*/
public static Validator createRegexValidator(final int minCapturingGroups, final int maxCapturingGroups, final boolean supportAttributeExpressionLanguage) {
return new Validator() {
@@ -412,7 +431,12 @@ public class StandardValidators {
try {
substituted = context.newPropertyValue(value).evaluateAttributeExpressions().getValue();
} catch (final Exception e) {
- return new ValidationResult.Builder().subject(subject).input(value).valid(false).explanation("Failed to evaluate the Attribute Expression Language due to " + e.toString()).build();
+ return new ValidationResult.Builder()
+ .subject(subject)
+ .input(value)
+ .valid(false)
+ .explanation("Failed to evaluate the Attribute Expression Language due to " + e.toString())
+ .build();
}
} else {
substituted = value;
@@ -421,12 +445,22 @@ public class StandardValidators {
final Pattern pattern = Pattern.compile(substituted);
final int numGroups = pattern.matcher("").groupCount();
if (numGroups < minCapturingGroups || numGroups > maxCapturingGroups) {
- return new ValidationResult.Builder().subject(subject).input(value).valid(false).explanation("RegEx is required to have between " + minCapturingGroups + " and " + maxCapturingGroups + " Capturing Groups but has " + numGroups).build();
+ return new ValidationResult.Builder()
+ .subject(subject)
+ .input(value)
+ .valid(false)
+ .explanation("RegEx is required to have between " + minCapturingGroups + " and " + maxCapturingGroups + " Capturing Groups but has " + numGroups)
+ .build();
}
return new ValidationResult.Builder().subject(subject).input(value).valid(true).build();
} catch (final Exception e) {
- return new ValidationResult.Builder().subject(subject).input(value).valid(false).explanation("Not a valid Java Regular Expression").build();
+ return new ValidationResult.Builder()
+ .subject(subject)
+ .input(value)
+ .valid(false)
+ .explanation("Not a valid Java Regular Expression")
+ .build();
}
}
@@ -444,7 +478,12 @@ public class StandardValidators {
final ResultType resultType = allowExtraCharacters ? ResultType.STRING : context.newExpressionLanguageCompiler().getResultType(input);
if (!resultType.equals(expectedResultType)) {
- return new ValidationResult.Builder().subject(subject).input(input).valid(false).explanation("Expected Attribute Query to return type " + expectedResultType + " but query returns type " + resultType).build();
+ return new ValidationResult.Builder()
+ .subject(subject)
+ .input(input)
+ .valid(false)
+ .explanation("Expected Attribute Query to return type " + expectedResultType + " but query returns type " + resultType)
+ .build();
}
return new ValidationResult.Builder().subject(subject).input(input).valid(true).build();
@@ -456,7 +495,7 @@ public class StandardValidators {
return new Validator() {
@Override
public ValidationResult validate(final String subject, final String input, final ValidationContext context) {
- if ( context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(input) ) {
+ if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(input)) {
return new ValidationResult.Builder().subject(subject).input(input).explanation("Expression Language Present").valid(true).build();
}
@@ -502,7 +541,7 @@ public class StandardValidators {
@Override
public ValidationResult validate(final String subject, final String input, final ValidationContext context) {
- if ( context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(input) ) {
+ if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(input)) {
return new ValidationResult.Builder().subject(subject).input(input).explanation("Expression Language Present").valid(true).build();
}
@@ -523,7 +562,8 @@ public class StandardValidators {
}
} else {
builder.subject(subject).input(input).valid(false)
- .explanation("Must be of format <duration> <TimeUnit> where <duration> is a non-negative integer and TimeUnit is a supported Time Unit, such as: nanos, millis, secs, mins, hrs, days");
+ .explanation("Must be of format <duration> <TimeUnit> where <duration> is a non-negative "
+ + "integer and TimeUnit is a supported Time Unit, such as: nanos, millis, secs, mins, hrs, days");
}
return builder.build();
}
@@ -539,7 +579,7 @@ public class StandardValidators {
@Override
public ValidationResult validate(final String subject, final String value, final ValidationContext context) {
- if ( context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(value) ) {
+ if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(value)) {
return new ValidationResult.Builder().subject(subject).input(value).explanation("Expression Language Present").valid(true).build();
}
@@ -574,7 +614,7 @@ public class StandardValidators {
@Override
public ValidationResult validate(final String subject, final String value, final ValidationContext context) {
- if ( context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(value) ) {
+ if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(value)) {
return new ValidationResult.Builder().subject(subject).input(value).explanation("Expression Language Present").valid(true).build();
}
@@ -617,17 +657,19 @@ public class StandardValidators {
/**
* Creates a validator based on existence of a {@link ControllerService}.
- *
- * @param serviceClass the controller service API your {@link ConfigurableComponent} depends on
+ *
+ * @param serviceClass the controller service API your
+ * {@link ConfigurableComponent} depends on
* @return a Validator
- * @deprecated As of release 0.1.0-incubating, replaced by {@link org.apache.nifi.components.PropertyDescriptor.Builder#identifiesControllerService(Class)}
+ * @deprecated As of release 0.1.0-incubating, replaced by
+ * {@link org.apache.nifi.components.PropertyDescriptor.Builder#identifiesControllerService(Class)}
*/
@Deprecated
public static Validator createControllerServiceExistsValidator(final Class<? extends ControllerService> serviceClass) {
return new Validator() {
@Override
public ValidationResult validate(final String subject, final String input, final ValidationContext context) {
- if ( context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(input) ) {
+ if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(input)) {
return new ValidationResult.Builder().subject(subject).input(input).explanation("Expression Language Present").valid(true).build();
}
@@ -638,14 +680,24 @@ public class StandardValidators {
}
if (!serviceClass.isAssignableFrom(svc.getClass())) {
- return new ValidationResult.Builder().valid(false).input(input).subject(subject).explanation("Controller Service with this ID is of type " + svc.getClass().getName() + " but is expected to be of type " + serviceClass.getName()).build();
+ return new ValidationResult.Builder()
+ .valid(false)
+ .input(input)
+ .subject(subject)
+ .explanation("Controller Service with this ID is of type " + svc.getClass().getName() + " but is expected to be of type " + serviceClass.getName())
+ .build();
}
final ValidationContext serviceValidationContext = context.getControllerServiceValidationContext(svc);
final Collection<ValidationResult> serviceValidationResults = svc.validate(serviceValidationContext);
for (final ValidationResult result : serviceValidationResults) {
if (!result.isValid()) {
- return new ValidationResult.Builder().valid(false).input(input).subject(subject).explanation("Controller Service " + input + " is not valid: " + result.getExplanation()).build();
+ return new ValidationResult.Builder()
+ .valid(false)
+ .input(input)
+ .subject(subject)
+ .explanation("Controller Service " + input + " is not valid: " + result.getExplanation())
+ .build();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-commons/nifi-processor-utilities/src/test/java/org/apache/nifi/processor/util/TestStandardValidators.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-processor-utilities/src/test/java/org/apache/nifi/processor/util/TestStandardValidators.java b/nifi/nifi-commons/nifi-processor-utilities/src/test/java/org/apache/nifi/processor/util/TestStandardValidators.java
index a8f4bae..bcd402d 100644
--- a/nifi/nifi-commons/nifi-processor-utilities/src/test/java/org/apache/nifi/processor/util/TestStandardValidators.java
+++ b/nifi/nifi-commons/nifi-processor-utilities/src/test/java/org/apache/nifi/processor/util/TestStandardValidators.java
@@ -35,7 +35,7 @@ public class TestStandardValidators {
ValidationResult vr;
final ValidationContext validationContext = Mockito.mock(ValidationContext.class);
-
+
vr = val.validate("TimePeriodTest", "0 sense made", validationContext);
assertFalse(vr.isValid());
@@ -54,22 +54,22 @@ public class TestStandardValidators {
vr = val.validate("TimePeriodTest", "1 sec", validationContext);
assertTrue(vr.isValid());
}
-
+
@Test
public void testDataSizeBoundsValidator() {
Validator val = StandardValidators.createDataSizeBoundsValidator(100, 1000);
- ValidationResult vr;
-
+ ValidationResult vr;
+
final ValidationContext validationContext = Mockito.mock(ValidationContext.class);
vr = val.validate("DataSizeBounds", "5 GB", validationContext);
assertFalse(vr.isValid());
-
+
vr = val.validate("DataSizeBounds", "0 B", validationContext);
assertFalse(vr.isValid());
vr = val.validate("DataSizeBounds", "99 B", validationContext);
assertFalse(vr.isValid());
-
+
vr = val.validate("DataSizeBounds", "100 B", validationContext);
assertTrue(vr.isValid());
@@ -78,12 +78,12 @@ public class TestStandardValidators {
vr = val.validate("DataSizeBounds", "1000 B", validationContext);
assertTrue(vr.isValid());
-
+
vr = val.validate("DataSizeBounds", "1001 B", validationContext);
assertFalse(vr.isValid());
-
+
vr = val.validate("DataSizeBounds", "water", validationContext);
assertFalse(vr.isValid());
-
+
}
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-commons/nifi-site-to-site-client/pom.xml
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/pom.xml b/nifi/nifi-commons/nifi-site-to-site-client/pom.xml
index 5d3d93e..c024be6 100644
--- a/nifi/nifi-commons/nifi-site-to-site-client/pom.xml
+++ b/nifi/nifi-commons/nifi-site-to-site-client/pom.xml
@@ -14,41 +14,41 @@
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
+ <modelVersion>4.0.0</modelVersion>
- <parent>
- <groupId>org.apache.nifi</groupId>
- <artifactId>nifi-commons</artifactId>
- <version>0.1.0-incubating-SNAPSHOT</version>
- </parent>
+ <parent>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-commons</artifactId>
+ <version>0.1.0-incubating-SNAPSHOT</version>
+ </parent>
- <artifactId>nifi-site-to-site-client</artifactId>
+ <artifactId>nifi-site-to-site-client</artifactId>
- <dependencies>
- <dependency>
- <groupId>org.apache.nifi</groupId>
- <artifactId>nifi-api</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.nifi</groupId>
- <artifactId>nifi-utils</artifactId>
- </dependency>
- <dependency>
- <groupId>org.codehaus.jackson</groupId>
- <artifactId>jackson-mapper-asl</artifactId>
- <version>1.9.13</version>
- </dependency>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-utils</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.codehaus.jackson</groupId>
+ <artifactId>jackson-mapper-asl</artifactId>
+ <version>1.9.13</version>
+ </dependency>
- <dependency>
- <groupId>org.apache.nifi</groupId>
- <artifactId>nifi-client-dto</artifactId>
- <version>0.1.0-incubating-SNAPSHOT</version>
- </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-client-dto</artifactId>
+ <version>0.1.0-incubating-SNAPSHOT</version>
+ </dependency>
- <dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- <scope>test</scope>
- </dependency>
- </dependencies>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
</project>
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/AbstractCommunicationsSession.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/AbstractCommunicationsSession.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/AbstractCommunicationsSession.java
index 4babb92..dacfd64 100644
--- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/AbstractCommunicationsSession.java
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/AbstractCommunicationsSession.java
@@ -19,14 +19,15 @@ package org.apache.nifi.remote;
import org.apache.nifi.remote.protocol.CommunicationsSession;
public abstract class AbstractCommunicationsSession implements CommunicationsSession {
+
private String userDn;
-
+
private volatile String uri;
-
+
public AbstractCommunicationsSession(final String uri) {
this.uri = uri;
}
-
+
@Override
public String toString() {
return uri;
@@ -46,7 +47,7 @@ public abstract class AbstractCommunicationsSession implements CommunicationsSes
public String getUserDn() {
return userDn;
}
-
+
@Override
public void setUserDn(final String dn) {
this.userDn = dn;
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/Communicant.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/Communicant.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/Communicant.java
index ac2d498..17b990e 100644
--- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/Communicant.java
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/Communicant.java
@@ -20,28 +20,27 @@ package org.apache.nifi.remote;
* Represents the remote entity that the client is communicating with
*/
public interface Communicant {
+
/**
- * Returns the NiFi site-to-site URL for the remote NiFi instance
- * @return
+ * @return the NiFi site-to-site URL for the remote NiFi instance
*/
String getUrl();
-
+
/**
- * The Host of the remote NiFi instance
- * @return
+ * @return The Host of the remote NiFi instance
*/
String getHost();
-
+
/**
- * The Port that the remote NiFi instance is listening on for site-to-site communications
- * @return
+ * @return The Port that the remote NiFi instance is listening on for
+ * site-to-site communications
*/
int getPort();
-
+
/**
- * The distinguished name that the remote NiFi instance has provided in its certificate if
- * using secure communications, or <code>null</code> if the Distinguished Name is unknown
- * @return
+ * @return The distinguished name that the remote NiFi instance has provided
+ * in its certificate if using secure communications, or <code>null</code>
+ * if the Distinguished Name is unknown
*/
String getDistinguishedName();
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/Peer.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/Peer.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/Peer.java
index 2428078..5cb37b0 100644
--- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/Peer.java
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/Peer.java
@@ -31,7 +31,7 @@ public class Peer implements Communicant {
private final String clusterUrl;
private final String host;
private final int port;
-
+
private final Map<String, Long> penaltyExpirationMap = new HashMap<>();
private boolean closed = false;
@@ -53,14 +53,14 @@ public class Peer implements Communicant {
public PeerDescription getDescription() {
return description;
}
-
+
@Override
public String getUrl() {
return url;
}
-
+
public String getClusterUrl() {
- return clusterUrl;
+ return clusterUrl;
}
public CommunicationsSession getCommunicationsSession() {
@@ -79,24 +79,24 @@ public class Peer implements Communicant {
}
/**
- * Penalizes this peer for the given destination only for the provided number of milliseconds
- * @param destinationId
- * @param millis
+ * Penalizes this peer for the given destination only for the provided
+ * number of milliseconds
+ *
+ * @param destinationId id of destination
+ * @param millis period of time to penalize peer
*/
public void penalize(final String destinationId, final long millis) {
final Long currentPenalty = penaltyExpirationMap.get(destinationId);
final long proposedPenalty = System.currentTimeMillis() + millis;
- if ( currentPenalty == null || proposedPenalty > currentPenalty ) {
+ if (currentPenalty == null || proposedPenalty > currentPenalty) {
penaltyExpirationMap.put(destinationId, proposedPenalty);
}
}
-
public boolean isPenalized(final String destinationId) {
final Long currentPenalty = penaltyExpirationMap.get(destinationId);
return (currentPenalty != null && currentPenalty > System.currentTimeMillis());
}
-
public boolean isClosed() {
return closed;
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/PeerDescription.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/PeerDescription.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/PeerDescription.java
index 0e8e498..6fc90e4 100644
--- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/PeerDescription.java
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/PeerDescription.java
@@ -17,10 +17,11 @@
package org.apache.nifi.remote;
public class PeerDescription {
+
private final String hostname;
private final int port;
private final boolean secure;
-
+
public PeerDescription(final String hostname, final int port, final boolean secure) {
this.hostname = hostname;
this.port = port;
@@ -64,7 +65,7 @@ public class PeerDescription {
if (getClass() != obj.getClass()) {
return false;
}
-
+
final PeerDescription other = (PeerDescription) obj;
if (hostname == null) {
if (other.hostname != null) {
@@ -73,7 +74,7 @@ public class PeerDescription {
} else if (!hostname.equals(other.hostname)) {
return false;
}
-
+
return port == other.port;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/PeerStatus.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/PeerStatus.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/PeerStatus.java
index b68ac33..6c8a4ec 100644
--- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/PeerStatus.java
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/PeerStatus.java
@@ -17,6 +17,7 @@
package org.apache.nifi.remote;
public class PeerStatus {
+
private final PeerDescription description;
private final int numFlowFiles;
@@ -28,15 +29,15 @@ public class PeerStatus {
public PeerDescription getPeerDescription() {
return description;
}
-
+
public int getFlowFileCount() {
return numFlowFiles;
}
@Override
public String toString() {
- return "PeerStatus[hostname=" + description.getHostname() + ",port=" + description.getPort() +
- ",secure=" + description.isSecure() + ",flowFileCount=" + numFlowFiles + "]";
+ return "PeerStatus[hostname=" + description.getHostname() + ",port=" + description.getPort()
+ + ",secure=" + description.isSecure() + ",flowFileCount=" + numFlowFiles + "]";
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/RemoteResourceInitiator.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/RemoteResourceInitiator.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/RemoteResourceInitiator.java
index f469724..582916e 100644
--- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/RemoteResourceInitiator.java
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/RemoteResourceInitiator.java
@@ -25,49 +25,51 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class RemoteResourceInitiator {
- public static final int RESOURCE_OK = 20;
- public static final int DIFFERENT_RESOURCE_VERSION = 21;
- public static final int ABORT = 255;
- private static final Logger logger = LoggerFactory.getLogger(RemoteResourceInitiator.class);
-
- public static VersionedRemoteResource initiateResourceNegotiation(final VersionedRemoteResource resource, final DataInputStream dis, final DataOutputStream dos) throws IOException, HandshakeException {
+ public static final int RESOURCE_OK = 20;
+ public static final int DIFFERENT_RESOURCE_VERSION = 21;
+ public static final int ABORT = 255;
+
+ private static final Logger logger = LoggerFactory.getLogger(RemoteResourceInitiator.class);
+
+ public static VersionedRemoteResource initiateResourceNegotiation(final VersionedRemoteResource resource, final DataInputStream dis, final DataOutputStream dos)
+ throws IOException, HandshakeException {
// Write the classname of the RemoteStreamCodec, followed by its version
- logger.debug("Negotiating resource; proposal is {}", resource);
- dos.writeUTF(resource.getResourceName());
- final VersionNegotiator negotiator = resource.getVersionNegotiator();
- dos.writeInt(negotiator.getVersion());
- dos.flush();
-
+ logger.debug("Negotiating resource; proposal is {}", resource);
+ dos.writeUTF(resource.getResourceName());
+ final VersionNegotiator negotiator = resource.getVersionNegotiator();
+ dos.writeInt(negotiator.getVersion());
+ dos.flush();
+
// wait for response from server.
- logger.debug("Receiving response from remote instance");
+ logger.debug("Receiving response from remote instance");
final int statusCode = dis.read();
switch (statusCode) {
- case RESOURCE_OK: // server accepted our proposal of codec name/version
+ case RESOURCE_OK: // server accepted our proposal of codec name/version
logger.debug("Response was RESOURCE_OK");
return resource;
- case DIFFERENT_RESOURCE_VERSION: // server accepted our proposal of codec name but not the version
+ case DIFFERENT_RESOURCE_VERSION: // server accepted our proposal of codec name but not the version
logger.debug("Response was DIFFERENT_RESOURCE_VERSION");
// Get server's preferred version
- final int newVersion = dis.readInt();
-
+ final int newVersion = dis.readInt();
+
// Determine our new preferred version that is no greater than the server's preferred version.
final Integer newPreference = negotiator.getPreferredVersion(newVersion);
// If we could not agree with server on a version, fail now.
- if ( newPreference == null ) {
+ if (newPreference == null) {
throw new HandshakeException("Could not agree on version for " + resource);
}
-
+
negotiator.setVersion(newPreference);
-
+
// Attempt negotiation of resource based on our new preferred version.
return initiateResourceNegotiation(resource, dis, dos);
case ABORT:
logger.debug("Response was ABORT");
- throw new HandshakeException("Remote destination aborted connection with message: " + dis.readUTF());
+ throw new HandshakeException("Remote destination aborted connection with message: " + dis.readUTF());
default:
logger.debug("Response was {}; unable to negotiate codec", statusCode);
- return null; // Unable to negotiate codec
+ return null; // Unable to negotiate codec
}
- }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/Transaction.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/Transaction.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/Transaction.java
index eb7312d..bfa5c82 100644
--- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/Transaction.java
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/Transaction.java
@@ -21,52 +21,57 @@ import java.util.Map;
import org.apache.nifi.remote.protocol.DataPacket;
-
/**
* <p>
* Provides a transaction for performing site-to-site data transfers.
* </p>
- *
+ *
* <p>
- * A Transaction is created by calling the
- * {@link org.apache.nifi.remote.client.SiteToSiteClient#createTransaction(TransferDirection) createTransaction(TransferDirection)}
- * method of a {@link org.apache.nifi.remote.client.SiteToSiteClient SiteToSiteClient}. The resulting Transaction
- * can be used to either send or receive data but not both. A new Transaction must be created in order perform the
- * other operation.
+ * A Transaction is created by calling the
+ * {@link org.apache.nifi.remote.client.SiteToSiteClient#createTransaction(TransferDirection) createTransaction(TransferDirection)}
+ * method of a
+ * {@link org.apache.nifi.remote.client.SiteToSiteClient SiteToSiteClient}. The
+ * resulting Transaction can be used to either send or receive data but not
+ * both. A new Transaction must be created in order perform the other operation.
* </p>
- *
+ *
* <p>
* The general flow of execute of a Transaction is as follows:
* <ol>
- * <li>Create the transaction as described above.</li>
- * <li>Send data via the {@link #send(DataPacket)} method or receive data via the {@link #receive()} method. This method
- * will be called 1 or more times. In the case of receive, this method should be called until the method returns {@code null},
- * signifying that the remote instance is finished sending data. <b>Note:</b> <code>receive()</code> should not be
- * called a second time without first fully consuming the stream from the previous Packet that was received.</li>
- * <li>Confirm the transaction via the {@link #confirm()} method.</li>
- * <li>Either complete the transaction via the {@link #complete(boolean)} method or cancel the transaction
- * via the {@link #cancel()} method.</li>
+ * <li>Create the transaction as described above.</li>
+ * <li>Send data via the {@link #send(DataPacket)} method or receive data via
+ * the {@link #receive()} method. This method will be called 1 or more times. In
+ * the case of receive, this method should be called until the method returns
+ * {@code null}, signifying that the remote instance is finished sending data.
+ * <b>Note:</b> <code>receive()</code> should not be called a second time
+ * without first fully consuming the stream from the previous Packet that was
+ * received.</li>
+ * <li>Confirm the transaction via the {@link #confirm()} method.</li>
+ * <li>Either complete the transaction via the {@link #complete(boolean)} method
+ * or cancel the transaction via the {@link #cancel()} method.</li>
* </ol>
* </p>
- *
+ *
* <p>
- * It is important that the Transaction be terminated in order to free the resources held
- * by the Transaction. If a Transaction is not terminated, its resources will not be freed and
- * if the Transaction holds connections from a connection pool, the connections in that pool
- * will eventually become exhausted. A Transaction is terminated by calling one of the following
+ * It is important that the Transaction be terminated in order to free the
+ * resources held by the Transaction. If a Transaction is not terminated, its
+ * resources will not be freed and if the Transaction holds connections from a
+ * connection pool, the connections in that pool will eventually become
+ * exhausted. A Transaction is terminated by calling one of the following
* methods:
- * <ul>
- * <li>{@link #complete(boolean)}</li>
- * <li>{@link #cancel()}</li>
- * <li>{@link #error()}</li>
- * </ul>
+ * <ul>
+ * <li>{@link #complete(boolean)}</li>
+ * <li>{@link #cancel()}</li>
+ * <li>{@link #error()}</li>
+ * </ul>
* </p>
- *
+ *
* <p>
- * If at any point an IOException is thrown from one of the methods of the Transaction, that Transaction
- * is automatically closed via a call to {@link #error()}.
+ * If at any point an IOException is thrown from one of the methods of the
+ * Transaction, that Transaction is automatically closed via a call to
+ * {@link #error()}.
* </p>
- *
+ *
* <p>
* The Transaction class should not be assumed to be thread-safe.
* </p>
@@ -75,140 +80,146 @@ public interface Transaction {
/**
* Sends information to the remote NiFi instance.
- *
+ *
* @param dataPacket the data packet to send
- * @throws IOException
+ * @throws IOException if unable to send
*/
void send(DataPacket dataPacket) throws IOException;
-
+
/**
- * Sends the given byte array as the content of a {@link DataPacket} along with the
- * provided attributes
- *
- * @param content
- * @param attributes
- * @throws IOException
+ * Sends the given byte array as the content of a {@link DataPacket} along
+ * with the provided attributes
+ *
+ * @param content to send
+ * @param attributes of the content
+ * @throws IOException if unable to send
*/
void send(byte[] content, Map<String, String> attributes) throws IOException;
-
+
/**
- * Retrieves information from the remote NiFi instance, if any is available. If no data is available, will return
- * {@code null}. It is important to consume all data from the remote NiFi instance before attempting to
- * call {@link #confirm()}. This is because the sender is always responsible for determining when the Transaction
- * has finished. This is done in order to prevent the need for a round-trip network request to receive data for
+ * Retrieves information from the remote NiFi instance, if any is available.
+ * If no data is available, will return {@code null}. It is important to
+ * consume all data from the remote NiFi instance before attempting to call
+ * {@link #confirm()}. This is because the sender is always responsible for
+ * determining when the Transaction has finished. This is done in order to
+ * prevent the need for a round-trip network request to receive data for
* each data packet.
- *
- * @return the DataPacket received, or {@code null} if there is no more data to receive.
- * @throws IOException
+ *
+ * @return the DataPacket received, or {@code null} if there is no more data
+ * to receive.
+ * @throws IOException if unable to receive
*/
DataPacket receive() throws IOException;
/**
* <p>
- * Confirms the data that was sent or received by comparing CRC32's of the data sent and the data received.
+ * Confirms the data that was sent or received by comparing CRC32's of the
+ * data sent and the data received.
+ * </p>
+ *
+ * <p>
+ * Even if the protocol being used to send the data is reliable and
+ * guarantees ordering of packets (such as TCP), it is still required that
+ * we confirm the transaction before completing the transaction. This is
+ * done as "safety net" or a defensive programming technique. Mistakes
+ * happen, and this mechanism helps to ensure that if a bug exists somewhere
+ * along the line that we do not end up sending or receiving corrupt data.
+ * If the CRC32 of the sender and the CRC32 of the receiver do not match, an
+ * IOException will be thrown and both the sender and receiver will cancel
+ * the transaction automatically.
* </p>
- *
+ *
* <p>
- * Even if the protocol being used to send the data is reliable and guarantees ordering of packets (such as TCP),
- * it is still required that we confirm the transaction before completing the transaction. This is done as
- * "safety net" or a defensive programming technique. Mistakes happen, and this mechanism helps to ensure that if
- * a bug exists somewhere along the line that we do not end up sending or receiving corrupt data. If the
- * CRC32 of the sender and the CRC32 of the receiver do not match, an IOException will be thrown and both the
- * sender and receiver will cancel the transaction automatically.
+ * If the {@link TransferDirection} of this Transaction is RECEIVE, this
+ * method will throw an Exception unless all data from the remote instance
+ * has been consumed (i.e., a call to {@link #receive()} returns
+ * {@code null}).
* </p>
- *
+ *
+ * <p>
+ * If the {@link TransferDirection} of this Transaction is SEND, calling
+ * this method dictates that no more data will be sent in this transaction.
+ * I.e., there will be no more calls to {@link #send(DataPacket)}.
+ * </p>
+ *
+ * @throws IOException if unable to confirm transaction
+ */
+ void confirm() throws IOException;
+
+ /**
* <p>
- * If the {@link TransferDirection} of this Transaction is RECEIVE, this method will throw an Exception unless
- * all data from the remote instance has been consumed (i.e., a call to {@link #receive()} returns {@code null}).
+ * Completes the transaction and indicates to both the sender and receiver
+ * that the data transfer was successful.
* </p>
- *
+ *
+ * @throws IOException if unable to complete
+ *
+ * @return a TransactionCompletion that contains details about the
+ * Transaction
+ */
+ TransactionCompletion complete() throws IOException;
+
+ /**
* <p>
- * If the {@link TransferDirection} of this Transaction is SEND, calling this method dictates that no more data will be
- * sent in this transaction. I.e., there will be no more calls to {@link #send(DataPacket)}.
+ * Cancels this transaction, indicating to the sender that the data has not
+ * been successfully received so that the sender can retry or handle however
+ * is appropriate.
* </p>
- *
- * @throws IOException
+ *
+ * @param explanation an explanation to tell the other party why the
+ * transaction was canceled.
+ * @throws IOException if unable to cancel
*/
- void confirm() throws IOException;
-
- /**
+ void cancel(final String explanation) throws IOException;
+
+ /**
* <p>
- * Completes the transaction and indicates to both the sender and receiver that the data transfer was
- * successful.
+ * Sets the TransactionState of the Transaction to
+ * {@link TransactionState#ERROR}, and closes the Transaction. The
+ * underlying connection should not be returned to a connection pool in this
+ * case.
* </p>
- *
- * @throws IOException
- *
- * @return a TransactionCompletion that contains details about the Transaction
*/
- TransactionCompletion complete() throws IOException;
-
- /**
- * <p>
- * Cancels this transaction, indicating to the sender that the data has not been successfully received so that
- * the sender can retry or handle however is appropriate.
- * </p>
- *
- * @param explanation an explanation to tell the other party why the transaction was canceled.
- * @throws IOException
- */
- void cancel(final String explanation) throws IOException;
-
-
- /**
- * <p>
- * Sets the TransactionState of the Transaction to {@link TransactionState#ERROR}, and closes
- * the Transaction. The underlying connection should not be returned to a connection pool in this case.
- * </p>
- */
- void error();
-
-
- /**
- * Returns the current state of the Transaction.
- * @return
- * @throws IOException
- */
- TransactionState getState() throws IOException;
-
- /**
- * Returns a Communicant that represents the other side of this Transaction (i.e.,
- * the remote NiFi instance)
- * @return
- */
- Communicant getCommunicant();
-
-
- public enum TransactionState {
- /**
- * Transaction has been started but no data has been sent or received.
- */
- TRANSACTION_STARTED,
-
- /**
- * Transaction has been started and data has been sent or received.
- */
- DATA_EXCHANGED,
-
- /**
- * Data that has been transferred has been confirmed via its CRC. Transaction is
- * ready to be completed.
- */
- TRANSACTION_CONFIRMED,
-
- /**
- * Transaction has been successfully completed.
- */
- TRANSACTION_COMPLETED,
-
- /**
- * The Transaction has been canceled.
- */
- TRANSACTION_CANCELED,
-
- /**
- * The Transaction ended in an error.
- */
- ERROR;
- }
+ void error();
+
+ /**
+ * @return the current state of the Transaction.
+ * @throws IOException ioe
+ */
+ TransactionState getState() throws IOException;
+
+ /**
+ * @return a Communicant that represents the other side of this Transaction
+ * (i.e., the remote NiFi instance)
+ */
+ Communicant getCommunicant();
+
+ public enum TransactionState {
+
+ /**
+ * Transaction has been started but no data has been sent or received.
+ */
+ TRANSACTION_STARTED,
+ /**
+ * Transaction has been started and data has been sent or received.
+ */
+ DATA_EXCHANGED,
+ /**
+ * Data that has been transferred has been confirmed via its CRC.
+ * Transaction is ready to be completed.
+ */
+ TRANSACTION_CONFIRMED,
+ /**
+ * Transaction has been successfully completed.
+ */
+ TRANSACTION_COMPLETED,
+ /**
+ * The Transaction has been canceled.
+ */
+ TRANSACTION_CANCELED,
+ /**
+ * The Transaction ended in an error.
+ */
+ ERROR;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/TransactionCompletion.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/TransactionCompletion.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/TransactionCompletion.java
index be5f73a..1587e87 100644
--- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/TransactionCompletion.java
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/TransactionCompletion.java
@@ -21,43 +21,44 @@ import java.util.concurrent.TimeUnit;
import org.apache.nifi.remote.protocol.DataPacket;
-
/**
- * A TransactionCompletion provides information about a {@link Transaction} that has completed successfully.
+ * A TransactionCompletion provides information about a {@link Transaction} that
+ * has completed successfully.
*/
public interface TransactionCompletion {
-
+
/**
- * When a sending to a NiFi instance, the server may accept the content sent to it
- * but indicate that its queues are full and that the client should backoff sending
- * data for a bit. This method returns <code>true</code> if the server did in fact
- * request that, <code>false</code> otherwise.
- * @return
+ * When a sending to a NiFi instance, the server may accept the content sent
+ * to it but indicate that its queues are full and that the client should
+ * backoff sending data for a bit.
+ *
+ * @return <code>true</code> if the server did in fact request that,
+ * <code>false</code> otherwise
*/
boolean isBackoff();
-
+
/**
- * Returns the number of Data Packets that were sent to or received from the remote
- * NiFi instance in the Transaction
- * @return
+ * @return the number of Data Packets that were sent to or received from the
+ * remote NiFi instance in the Transaction
*/
int getDataPacketsTransferred();
-
+
/**
- * Returns the number of bytes of DataPacket content that were sent to or received from
- * the remote NiFI instance in the Transaction. Note that this is different than the number
- * of bytes actually transferred between the client and server, as it does not take into
- * account the attributes or protocol-specific information that is exchanged but rather
- * takes into account only the data in the {@link InputStream} of the {@link DataPacket}
- * @return
+ * @return the number of bytes of DataPacket content that were sent to or
+ * received from the remote NiFI instance in the Transaction. Note that this
+ * is different than the number of bytes actually transferred between the
+ * client and server, as it does not take into account the attributes or
+ * protocol-specific information that is exchanged but rather takes into
+ * account only the data in the {@link InputStream} of the
+ * {@link DataPacket}
*/
long getBytesTransferred();
-
+
/**
- * Returns the amount of time that the Transaction took, from the time that the Transaction
- * was created to the time that the Transaction was completed.
- * @param timeUnit
- * @return
+ * @param timeUnit unit of time for which to report the duration
+ * @return the amount of time that the Transaction took, from the time that
+ * the Transaction was created to the time that the Transaction was
+ * completed
*/
long getDuration(TimeUnit timeUnit);
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/TransferDirection.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/TransferDirection.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/TransferDirection.java
index 45029a4..979ad9c 100644
--- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/TransferDirection.java
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/TransferDirection.java
@@ -16,17 +16,16 @@
*/
package org.apache.nifi.remote;
-
/**
- * An enumeration for specifying the direction in which data should be transferred between a client
- * and a remote NiFi instance.
+ * An enumeration for specifying the direction in which data should be
+ * transferred between a client and a remote NiFi instance.
*/
public enum TransferDirection {
- /**
- * The client is to send data to the remote instance.
- */
+
+ /**
+ * The client is to send data to the remote instance.
+ */
SEND,
-
/**
* The client is to receive data from the remote instance.
*/