You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2015/04/22 05:39:47 UTC
[3/7] incubator-nifi git commit: NIFI-271 checkpoint
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/DatagramChannelReader.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/DatagramChannelReader.java b/nifi/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/DatagramChannelReader.java
index 1eb5c7e..db76279 100644
--- a/nifi/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/DatagramChannelReader.java
+++ b/nifi/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/DatagramChannelReader.java
@@ -23,10 +23,6 @@ import java.nio.channels.SelectionKey;
import org.apache.nifi.io.nio.consumer.StreamConsumerFactory;
-/**
- *
- * @author none
- */
public final class DatagramChannelReader extends AbstractChannelReader {
public static final int MAX_UDP_PACKET_SIZE = 65507;
@@ -39,10 +35,10 @@ public final class DatagramChannelReader extends AbstractChannelReader {
* Will receive UDP data from channel and won't receive anything unless the
* given buffer has enough space for at least one full max udp packet.
*
- * @param key
- * @param buffer
- * @return
- * @throws IOException
+ * @param key selection key
+ * @param buffer to fill
+ * @return bytes read
+ * @throws IOException if error filling buffer from channel
*/
@Override
protected int fillBuffer(final SelectionKey key, final ByteBuffer buffer) throws IOException {
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/SocketChannelReader.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/SocketChannelReader.java b/nifi/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/SocketChannelReader.java
index db2c102..29c2973 100644
--- a/nifi/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/SocketChannelReader.java
+++ b/nifi/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/SocketChannelReader.java
@@ -22,10 +22,6 @@ import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import org.apache.nifi.io.nio.consumer.StreamConsumerFactory;
-/**
- *
- * @author none
- */
public final class SocketChannelReader extends AbstractChannelReader {
public SocketChannelReader(final String id, final SelectionKey key, final BufferPool empties, final StreamConsumerFactory consumerFactory) {
@@ -35,10 +31,10 @@ public final class SocketChannelReader extends AbstractChannelReader {
/**
* Receives TCP data from the socket channel for the given key.
*
- * @param key
- * @param buffer
- * @return
- * @throws IOException
+ * @param key selection key
+ * @param buffer byte buffer to fill
+ * @return bytes read
+ * @throws IOException if error reading bytes
*/
@Override
protected int fillBuffer(final SelectionKey key, final ByteBuffer buffer) throws IOException {
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/consumer/StreamConsumer.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/consumer/StreamConsumer.java b/nifi/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/consumer/StreamConsumer.java
index d75b7d7..cac8d8b 100644
--- a/nifi/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/consumer/StreamConsumer.java
+++ b/nifi/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/consumer/StreamConsumer.java
@@ -25,7 +25,6 @@ import org.apache.nifi.io.nio.BufferPool;
* thread providing data to process and another thread that is processing that
* data.
*
- * @author none
*/
public interface StreamConsumer {
@@ -36,7 +35,7 @@ public interface StreamConsumer {
* associated add to this given queue. If not, buffers will run out and all
* stream processing will halt. READ THIS!!!
*
- * @param returnQueue
+ * @param returnQueue pool of buffers to use
*/
void setReturnBufferQueue(BufferPool returnQueue);
@@ -45,7 +44,7 @@ public interface StreamConsumer {
* data to be processed. If the consumer is finished this should simply
* return the given buffer to the return buffer queue (after it is cleared)
*
- * @param buffer
+ * @param buffer filled buffer
*/
void addFilledBuffer(ByteBuffer buffer);
@@ -53,7 +52,8 @@ public interface StreamConsumer {
* Will be called by the thread that executes the consumption of data. May
* be called many times though once <code>isConsumerFinished</code> returns
* true this method will likely do nothing.
- * @throws java.io.IOException
+ *
+ * @throws java.io.IOException if there is an issue processing
*/
void process() throws IOException;
@@ -66,14 +66,14 @@ public interface StreamConsumer {
* If true signals the consumer is done consuming data and will not process
* any more buffers.
*
- * @return
+ * @return true if finished
*/
boolean isConsumerFinished();
/**
* Uniquely identifies the consumer
*
- * @return
+ * @return identifier of consumer
*/
String getId();
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/SSLContextFactory.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/SSLContextFactory.java b/nifi/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/SSLContextFactory.java
index 7ed5ad4..9c6cb82 100644
--- a/nifi/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/SSLContextFactory.java
+++ b/nifi/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/SSLContextFactory.java
@@ -81,12 +81,12 @@ public class SSLContextFactory {
*
*
* @return a SSLContext instance
- * @throws java.security.KeyStoreException
- * @throws java.io.IOException
- * @throws java.security.NoSuchAlgorithmException
- * @throws java.security.cert.CertificateException
- * @throws java.security.UnrecoverableKeyException
- * @throws java.security.KeyManagementException
+ * @throws java.security.KeyStoreException if problem with keystore
+ * @throws java.io.IOException if unable to create context
+ * @throws java.security.NoSuchAlgorithmException if algorithm isn't known
+ * @throws java.security.cert.CertificateException if certificate is invalid
+ * @throws java.security.UnrecoverableKeyException if the key cannot be recovered
+ * @throws java.security.KeyManagementException if the key is improper
*/
public SSLContext createSslContext() throws KeyStoreException, IOException, NoSuchAlgorithmException, CertificateException,
UnrecoverableKeyException, KeyManagementException {
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/ServerSocketConfiguration.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/ServerSocketConfiguration.java b/nifi/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/ServerSocketConfiguration.java
index fc279fb..d6aca92 100644
--- a/nifi/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/ServerSocketConfiguration.java
+++ b/nifi/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/ServerSocketConfiguration.java
@@ -26,9 +26,6 @@ import java.security.cert.CertificateException;
import javax.net.ssl.SSLContext;
-/**
- * @author unattributed
- */
public final class ServerSocketConfiguration {
private boolean needClientAuth;
@@ -40,7 +37,8 @@ public final class ServerSocketConfiguration {
public ServerSocketConfiguration() {
}
- public SSLContext createSSLContext() throws KeyManagementException, NoSuchAlgorithmException, UnrecoverableKeyException, KeyStoreException, CertificateException, FileNotFoundException, IOException {
+ public SSLContext createSSLContext()
+ throws KeyManagementException, NoSuchAlgorithmException, UnrecoverableKeyException, KeyStoreException, CertificateException, FileNotFoundException, IOException {
return sslContextFactory == null ? null : sslContextFactory.createSslContext();
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/SocketConfiguration.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/SocketConfiguration.java b/nifi/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/SocketConfiguration.java
index c24b540..8b803dc 100644
--- a/nifi/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/SocketConfiguration.java
+++ b/nifi/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/SocketConfiguration.java
@@ -26,9 +26,6 @@ import java.security.cert.CertificateException;
import javax.net.ssl.SSLContext;
-/**
- * @author unattributed
- */
public final class SocketConfiguration {
private Integer socketTimeout;
@@ -41,7 +38,8 @@ public final class SocketConfiguration {
private Integer trafficClass;
private SSLContextFactory sslContextFactory;
- public SSLContext createSSLContext() throws KeyManagementException, NoSuchAlgorithmException, UnrecoverableKeyException, KeyStoreException, CertificateException, FileNotFoundException, IOException {
+ public SSLContext createSSLContext()
+ throws KeyManagementException, NoSuchAlgorithmException, UnrecoverableKeyException, KeyStoreException, CertificateException, FileNotFoundException, IOException {
return sslContextFactory == null ? null : sslContextFactory.createSslContext();
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/SocketUtils.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/SocketUtils.java b/nifi/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/SocketUtils.java
index fb6a00c..27d676a 100644
--- a/nifi/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/SocketUtils.java
+++ b/nifi/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/SocketUtils.java
@@ -35,9 +35,6 @@ import org.apache.nifi.logging.NiFiLog;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-/**
- * @author unattributed
- */
public final class SocketUtils {
private static final Logger logger = new NiFiLog(LoggerFactory.getLogger(SocketUtils.class));
@@ -99,7 +96,8 @@ public final class SocketUtils {
return socket;
}
- public static ServerSocket createServerSocket(final int port, final ServerSocketConfiguration config) throws IOException, KeyManagementException, UnrecoverableKeyException, NoSuchAlgorithmException, KeyStoreException, CertificateException {
+ public static ServerSocket createServerSocket(final int port, final ServerSocketConfiguration config)
+ throws IOException, KeyManagementException, UnrecoverableKeyException, NoSuchAlgorithmException, KeyStoreException, CertificateException {
if (config == null) {
throw new NullPointerException("Configuration may not be null.");
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/MulticastListener.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/MulticastListener.java b/nifi/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/MulticastListener.java
index e562c25..1ce2ea0 100644
--- a/nifi/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/MulticastListener.java
+++ b/nifi/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/MulticastListener.java
@@ -35,7 +35,6 @@ import org.slf4j.LoggerFactory;
* then the message is wrapped with a MulticastProtocolMessage before being sent
* to the originator.
*
- * @author unattributed
*/
public abstract class MulticastListener {
@@ -80,8 +79,8 @@ public abstract class MulticastListener {
* Implements the action to perform when a new datagram is received. This
* class must not close the multicast socket.
*
- * @param multicastSocket
- * @param packet the datagram socket
+ * @param multicastSocket socket
+ * @param packet the datagram packet
*/
public abstract void dispatchRequest(final MulticastSocket multicastSocket, final DatagramPacket packet);
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-commons/nifi-socket-utils/src/test/java/org/apache/nifi/io/nio/example/TCPClient.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-socket-utils/src/test/java/org/apache/nifi/io/nio/example/TCPClient.java b/nifi/nifi-commons/nifi-socket-utils/src/test/java/org/apache/nifi/io/nio/example/TCPClient.java
index b3d214e..447d701 100644
--- a/nifi/nifi-commons/nifi-socket-utils/src/test/java/org/apache/nifi/io/nio/example/TCPClient.java
+++ b/nifi/nifi-commons/nifi-socket-utils/src/test/java/org/apache/nifi/io/nio/example/TCPClient.java
@@ -78,7 +78,8 @@ public class TCPClient {
for (int i = 0; i < 1000; i++) {
sock.getOutputStream().write(bytes);
totalBytes += bytes.length;
- } sock.getOutputStream().flush();
+ }
+ sock.getOutputStream().flush();
}
logger.info("Total bytes sent: " + totalBytes + " to port " + port);
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/MinimalLockingWriteAheadLog.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/MinimalLockingWriteAheadLog.java b/nifi/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/MinimalLockingWriteAheadLog.java
index 5b20b93..5c8b4c8 100644
--- a/nifi/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/MinimalLockingWriteAheadLog.java
+++ b/nifi/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/MinimalLockingWriteAheadLog.java
@@ -72,7 +72,7 @@ import org.slf4j.LoggerFactory;
* updates for a given Record at any one time.
* </p>
*
- * @param <T>
+ * @param <T> type of record this WAL is for
*/
public final class MinimalLockingWriteAheadLog<T> implements WriteAheadRepository<T> {
@@ -113,14 +113,12 @@ public final class MinimalLockingWriteAheadLog<T> implements WriteAheadRepositor
* @param paths a sorted set of Paths to use for the partitions/journals and
* the snapshot. The snapshot will always be written to the first path
* specified.
- *
* @param partitionCount the number of partitions/journals to use. For best
* performance, this should be close to the number of threads that are
* expected to update the repository simultaneously
- *
- * @param serde
- * @param syncListener
- * @throws IOException
+ * @param serde the serializer/deserializer for records
+ * @param syncListener the listener
+ * @throws IOException if unable to initialize due to IO issue
*/
@SuppressWarnings("unchecked")
public MinimalLockingWriteAheadLog(final SortedSet<Path> paths, final int partitionCount, final SerDe<T> serde, final SyncListener syncListener) throws IOException {
@@ -209,7 +207,9 @@ public final class MinimalLockingWriteAheadLog<T> implements WriteAheadRepositor
while (true) {
final int numBlackListed = numberBlackListedPartitions.get();
if (numBlackListed >= partitions.length) {
- throw new IOException("All Partitions have been blacklisted due to failures when attempting to update. If the Write-Ahead Log is able to perform a checkpoint, this issue may resolve itself. Otherwise, manual intervention will be required.");
+ throw new IOException("All Partitions have been blacklisted due to "
+ + "failures when attempting to update. If the Write-Ahead Log is able to perform a checkpoint, "
+ + "this issue may resolve itself. Otherwise, manual intervention will be required.");
}
final long partitionIdx = partitionIndex.getAndIncrement();
@@ -248,7 +248,9 @@ public final class MinimalLockingWriteAheadLog<T> implements WriteAheadRepositor
} else if (updateType == UpdateType.SWAP_OUT) {
final String newLocation = serde.getLocation(record);
if (newLocation == null) {
- logger.error("Received Record (ID=" + recordIdentifier + ") with UpdateType of SWAP_OUT but no indicator of where the Record is to be Swapped Out to; these records may be lost when the repository is restored!");
+ logger.error("Received Record (ID=" + recordIdentifier + ") with UpdateType of SWAP_OUT but "
+ + "no indicator of where the Record is to be Swapped Out to; these records may be "
+ + "lost when the repository is restored!");
} else {
recordMap.remove(recordIdentifier);
this.externalLocations.add(newLocation);
@@ -256,7 +258,9 @@ public final class MinimalLockingWriteAheadLog<T> implements WriteAheadRepositor
} else if (updateType == UpdateType.SWAP_IN) {
final String newLocation = serde.getLocation(record);
if (newLocation == null) {
- logger.error("Received Record (ID=" + recordIdentifier + ") with UpdateType of SWAP_IN but no indicator of where the Record is to be Swapped In from; these records may be duplicated when the repository is restored!");
+ logger.error("Received Record (ID=" + recordIdentifier + ") with UpdateType of SWAP_IN but no "
+ + "indicator of where the Record is to be Swapped In from; these records may be duplicated "
+ + "when the repository is restored!");
} else {
externalLocations.remove(newLocation);
}
@@ -345,11 +349,13 @@ public final class MinimalLockingWriteAheadLog<T> implements WriteAheadRepositor
final int waliImplementationVersion = dataIn.readInt();
if (!waliImplementationClass.equals(MinimalLockingWriteAheadLog.class.getName())) {
- throw new IOException("Write-Ahead Log located at " + snapshotPath + " was written using the " + waliImplementationClass + " class; cannot restore using " + getClass().getName());
+ throw new IOException("Write-Ahead Log located at " + snapshotPath + " was written using the "
+ + waliImplementationClass + " class; cannot restore using " + getClass().getName());
}
if (waliImplementationVersion > getVersion()) {
- throw new IOException("Write-Ahead Log located at " + snapshotPath + " was written using version " + waliImplementationVersion + " of the " + waliImplementationClass + " class; cannot restore using Version " + getVersion());
+ throw new IOException("Write-Ahead Log located at " + snapshotPath + " was written using version "
+ + waliImplementationVersion + " of the " + waliImplementationClass + " class; cannot restore using Version " + getVersion());
}
dataIn.readUTF(); // ignore serde class name for now
@@ -380,7 +386,8 @@ public final class MinimalLockingWriteAheadLog<T> implements WriteAheadRepositor
}
this.recoveredExternalLocations.addAll(swapLocations);
- logger.debug("{} restored {} Records and {} Swap Files from Snapshot, ending with Transaction ID {}", new Object[]{this, numRecords, recoveredExternalLocations.size(), maxTransactionId});
+ logger.debug("{} restored {} Records and {} Swap Files from Snapshot, ending with Transaction ID {}",
+ new Object[]{this, numRecords, recoveredExternalLocations.size(), maxTransactionId});
return maxTransactionId;
}
}
@@ -390,10 +397,9 @@ public final class MinimalLockingWriteAheadLog<T> implements WriteAheadRepositor
* if recovery of a Partition requires the Write-Ahead Log be checkpointed
* before modification.
*
- * @param modifiableRecordMap
- * @param maxTransactionIdRestored
- * @return
- * @throws IOException
+ * @param modifiableRecordMap map
+ * @param maxTransactionIdRestored index of max restored transaction
+ * @throws IOException if unable to recover from edits
*/
private void recoverFromEdits(final Map<Object, T> modifiableRecordMap, final Long maxTransactionIdRestored) throws IOException {
final Map<Object, T> updateMap = new HashMap<>();
@@ -422,7 +428,8 @@ public final class MinimalLockingWriteAheadLog<T> implements WriteAheadRepositor
try {
partition.recoverNextTransaction(ignorableMap, updateMap, ignorableSwapLocations);
} catch (final EOFException e) {
- logger.error("{} unexpectedly reached End of File while reading from {} for Transaction {}; assuming crash and ignoring this transaction.",
+ logger.error("{} unexpectedly reached End of File while reading from {} for Transaction {}; "
+ + "assuming crash and ignoring this transaction.",
new Object[]{this, partition, transactionId});
}
}
@@ -442,7 +449,8 @@ public final class MinimalLockingWriteAheadLog<T> implements WriteAheadRepositor
modifiableRecordMap.remove(id);
}
} catch (final EOFException e) {
- logger.error("{} unexpectedly reached End-of-File when reading from {} for Transaction ID {}; assuming crash and ignoring this transaction",
+ logger.error("{} unexpectedly reached End-of-File when reading from {} for Transaction ID {}; "
+ + "assuming crash and ignoring this transaction",
new Object[]{this, nextPartition, firstTransactionId});
}
@@ -452,7 +460,8 @@ public final class MinimalLockingWriteAheadLog<T> implements WriteAheadRepositor
try {
subsequentTransactionId = nextPartition.getNextRecoverableTransactionId();
} catch (final IOException e) {
- logger.error("{} unexpectedly found End-of-File when reading from {} for Transaction ID {}; assuming crash and ignoring this transaction",
+ logger.error("{} unexpectedly found End-of-File when reading from {} for Transaction ID {}; "
+ + "assuming crash and ignoring this transaction",
new Object[]{this, nextPartition, firstTransactionId});
}
@@ -576,7 +585,8 @@ public final class MinimalLockingWriteAheadLog<T> implements WriteAheadRepositor
final long partitionMillis = TimeUnit.MILLISECONDS.convert(partitionEnd - partitionStart, TimeUnit.NANOSECONDS);
final long stopTheWorldMillis = TimeUnit.NANOSECONDS.toMillis(stopTheWorldNanos);
- logger.info("{} checkpointed with {} Records and {} Swap Files in {} milliseconds (Stop-the-world time = {} milliseconds, Clear Edit Logs time = {} millis), max Transaction ID {}",
+ logger.info("{} checkpointed with {} Records and {} Swap Files in {} milliseconds (Stop-the-world "
+ + "time = {} milliseconds, Clear Edit Logs time = {} millis), max Transaction ID {}",
new Object[]{this, records.size(), swapLocations.size(), millis, stopTheWorldMillis, partitionMillis, maxTransactionId});
return records.size();
@@ -605,9 +615,9 @@ public final class MinimalLockingWriteAheadLog<T> implements WriteAheadRepositor
*
* All methods with the exceptions of {@link #claim()}, {@link #tryClaim()},
* and {@link #releaseClaim()} in this Partition MUST be called while
- * holding the claim (via {@link #claim} or {@link #tryClaim()).
+ * holding the claim (via {@link #claim} or {@link #tryClaim()}).
*
- * @param <S>
+ * @param <S> type of record held in the partitions
*/
private static class Partition<S> {
@@ -703,7 +713,7 @@ public final class MinimalLockingWriteAheadLog<T> implements WriteAheadRepositor
* Closes resources pointing to the current journal and begins writing
* to a new one
*
- * @throws IOException
+ * @throws IOException if failure to rollover
*/
public void rollover() throws IOException {
lock.lock();
@@ -777,7 +787,8 @@ public final class MinimalLockingWriteAheadLog<T> implements WriteAheadRepositor
if (isJournalFile(file)) {
paths.add(file.toPath());
} else {
- logger.warn("Found file {}, but could not access it, or it was not in the expected format; will ignore this file", file.getAbsolutePath());
+ logger.warn("Found file {}, but could not access it, or it was not in the expected format; "
+ + "will ignore this file", file.getAbsolutePath());
}
}
@@ -836,7 +847,8 @@ public final class MinimalLockingWriteAheadLog<T> implements WriteAheadRepositor
return true;
}
- public void update(final Collection<S> records, final long transactionId, final Map<Object, S> recordMap, final boolean forceSync) throws IOException {
+ public void update(final Collection<S> records, final long transactionId, final Map<Object, S> recordMap, final boolean forceSync)
+ throws IOException {
if (this.closed) {
throw new IllegalStateException("Partition is closed");
}
@@ -889,7 +901,8 @@ public final class MinimalLockingWriteAheadLog<T> implements WriteAheadRepositor
final long waliVersion = recoveryIn.readInt();
if (waliVersion > writeAheadLogVersion) {
- throw new IOException("Cannot recovery from file " + nextRecoveryPath + " because it was written using WALI version " + waliVersion + ", but the version used to restore it is only " + writeAheadLogVersion);
+ throw new IOException("Cannot recovery from file " + nextRecoveryPath + " because it was written using "
+ + "WALI version " + waliVersion + ", but the version used to restore it is only " + writeAheadLogVersion);
}
@SuppressWarnings("unused")
@@ -936,7 +949,8 @@ public final class MinimalLockingWriteAheadLog<T> implements WriteAheadRepositor
final Path nextRecoveryPath = this.recoveryFiles.poll();
if (nextRecoveryPath != null) {
- throw new IllegalStateException("Signaled to end recovery, but there are more recovery files for Partition in directory " + editDirectory);
+ throw new IllegalStateException("Signaled to end recovery, but there are more recovery files for Partition "
+ + "in directory " + editDirectory);
}
final Path newEditPath = getNewEditPath();
@@ -999,7 +1013,7 @@ public final class MinimalLockingWriteAheadLog<T> implements WriteAheadRepositor
/**
* Must be called after recovery has finished
*
- * @return
+ * @return max recovered transaction id
*/
public long getMaxRecoveredTransactionId() {
return maxTransactionId.get();
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/SerDe.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/SerDe.java b/nifi/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/SerDe.java
index bbc7efb..cc984a6 100644
--- a/nifi/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/SerDe.java
+++ b/nifi/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/SerDe.java
@@ -35,10 +35,10 @@ public interface SerDe<T> {
* {@link DataOutputStream}.
* </p>
*
- * @param previousRecordState
- * @param newRecordState
- * @param out
- * @throws IOException
+ * @param previousRecordState previous state
+ * @param newRecordState new state
+ * @param out stream to write to
+ * @throws IOException if fail during write
*/
void serializeEdit(T previousRecordState, T newRecordState, DataOutputStream out) throws IOException;
@@ -48,9 +48,9 @@ public interface SerDe<T> {
* {@link DataOutputStream}.
* </p>
*
- * @param record
- * @param out
- * @throws IOException
+ * @param record to serialize
+ * @param out to write to
+ * @throws IOException if failed to write
*/
void serializeRecord(T record, DataOutputStream out) throws IOException;
@@ -63,13 +63,13 @@ public interface SerDe<T> {
* This method must never return <code>null</code>.
* </p>
*
- * @param in
+ * @param in to deserialize from
* @param currentRecordStates an unmodifiable map of Record ID's to the
* current state of that record
* @param version the version of the SerDe that was used to serialize the
* edit record
- * @return
- * @throws IOException
+ * @return deserialized record
+ * @throws IOException if failure reading
*/
T deserializeEdit(DataInputStream in, Map<Object, T> currentRecordStates, int version) throws IOException;
@@ -79,27 +79,27 @@ public interface SerDe<T> {
* record. If no data is available, returns <code>null</code>.
* </p>
*
- * @param in
+ * @param in stream to read from
* @param version the version of the SerDe that was used to serialize the
* record
- * @return
- * @throws IOException
+ * @return record
+ * @throws IOException failure reading
*/
T deserializeRecord(DataInputStream in, int version) throws IOException;
/**
* Returns the unique ID for the given record
*
- * @param record
- * @return
+ * @param record to obtain identifier for
+ * @return identifier of record
*/
Object getRecordIdentifier(T record);
/**
* Returns the UpdateType for the given record
*
- * @param record
- * @return
+ * @param record to retrieve update type for
+ * @return update type
*/
UpdateType getUpdateType(T record);
@@ -112,8 +112,8 @@ public interface SerDe<T> {
* WALI with a record of type {@link UpdateType#CREATE} that indicates a
* Location of file://tmp/external1
*
- * @param record
- * @return
+ * @param record to get location of
+ * @return location
*/
String getLocation(T record);
@@ -122,7 +122,7 @@ public interface SerDe<T> {
* when serializing/deserializing the edit logs so that if the version
* changes, we are still able to deserialize old versions
*
- * @return
+ * @return version
*/
int getVersion();
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/WriteAheadRepository.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/WriteAheadRepository.java b/nifi/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/WriteAheadRepository.java
index 4567872..7f0e828 100644
--- a/nifi/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/WriteAheadRepository.java
+++ b/nifi/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/WriteAheadRepository.java
@@ -48,7 +48,7 @@ import java.util.Set;
* one partition or may allow many partitions.
* </p>
*
- * @param <T>
+ * @param <T> the type of Record this repository is for
*/
public interface WriteAheadRepository<T> {
@@ -63,7 +63,7 @@ public interface WriteAheadRepository<T> {
* to be flushed to disk. If false, the data may be stored in Operating
* System buffers, which improves performance but could cause loss of data
* if power is lost or the Operating System crashes
- * @throws IOException
+ * @throws IOException if failure to update repo
* @throws IllegalArgumentException if multiple records within the given
* Collection have the same ID, as specified by {@link Record#getId()}
* method
@@ -78,8 +78,8 @@ public interface WriteAheadRepository<T> {
* before any updates are issued to the Repository.
* </p>
*
- * @return
- * @throws IOException
+ * @return recovered records
+ * @throws IOException if failure to read from repo
* @throws IllegalStateException if any updates have been issued against
* this Repository before this method is invoked
*/
@@ -92,8 +92,8 @@ public interface WriteAheadRepository<T> {
* BEFORE {@link update}.
* </p>
*
- * @return
- * @throws IOException
+ * @return swap location
+ * @throws IOException if failure reading swap locations
*/
Set<String> getRecoveredSwapLocations() throws IOException;
@@ -107,7 +107,7 @@ public interface WriteAheadRepository<T> {
*
*
* @return the number of records that were written to the new snapshot
- * @throws java.io.IOException
+ * @throws java.io.IOException if failure during checkpoint
*/
int checkpoint() throws IOException;
@@ -116,7 +116,7 @@ public interface WriteAheadRepository<T> {
* Causes the repository to checkpoint and then close any open resources.
* </p>
*
- * @throws IOException
+ * @throws IOException if failure to shutdown cleanly
*/
void shutdown() throws IOException;
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockControllerServiceInitializationContext.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockControllerServiceInitializationContext.java b/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockControllerServiceInitializationContext.java
index fd3c2de..bff1d62 100644
--- a/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockControllerServiceInitializationContext.java
+++ b/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockControllerServiceInitializationContext.java
@@ -29,7 +29,7 @@ public class MockControllerServiceInitializationContext extends MockControllerSe
public MockControllerServiceInitializationContext(final ControllerService controllerService, final String identifier) {
this(controllerService, identifier, new MockProcessorLog(identifier, controllerService));
}
-
+
public MockControllerServiceInitializationContext(final ControllerService controllerService, final String identifier, final ComponentLog logger) {
this.identifier = identifier;
this.logger = logger;
@@ -40,17 +40,17 @@ public class MockControllerServiceInitializationContext extends MockControllerSe
public String getIdentifier() {
return identifier;
}
-
+
@Override
public String getControllerServiceName(final String serviceIdentifier) {
- return null;
+ return null;
}
@Override
public ControllerServiceLookup getControllerServiceLookup() {
return this;
}
-
+
@Override
public ComponentLog getLogger() {
return logger;
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockControllerServiceLookup.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockControllerServiceLookup.java b/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockControllerServiceLookup.java
index 2734440..219ee24 100644
--- a/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockControllerServiceLookup.java
+++ b/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockControllerServiceLookup.java
@@ -40,17 +40,17 @@ public abstract class MockControllerServiceLookup implements ControllerServiceLo
public void removeControllerService(ControllerService service) {
final ControllerService canonical = getControllerService(service.getIdentifier());
- if ( canonical == null || canonical != service ) {
+ if (canonical == null || canonical != service) {
throw new IllegalArgumentException("Controller Service " + service + " is not known");
}
-
+
controllerServiceMap.remove(service.getIdentifier());
}
protected void addControllerServices(final MockControllerServiceLookup other) {
this.controllerServiceMap.putAll(other.controllerServiceMap);
}
-
+
protected ControllerServiceConfiguration getConfiguration(final String identifier) {
return controllerServiceMap.get(identifier);
}
@@ -80,7 +80,7 @@ public abstract class MockControllerServiceLookup implements ControllerServiceLo
public boolean isControllerServiceEnabling(final String serviceIdentifier) {
return false;
}
-
+
@Override
public Set<String> getControllerServiceIdentifiers(final Class<? extends ControllerService> serviceType) {
final Set<String> ids = new HashSet<>();
@@ -91,10 +91,10 @@ public abstract class MockControllerServiceLookup implements ControllerServiceLo
}
return ids;
}
-
+
@Override
public String getControllerServiceName(String serviceIdentifier) {
- final ControllerServiceConfiguration status = controllerServiceMap.get(serviceIdentifier);
- return status == null ? null : serviceIdentifier;
+ final ControllerServiceConfiguration status = controllerServiceMap.get(serviceIdentifier);
+ return status == null ? null : serviceIdentifier;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockFlowFile.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockFlowFile.java b/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockFlowFile.java
index 1be0293..e9fb9d6 100644
--- a/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockFlowFile.java
+++ b/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockFlowFile.java
@@ -185,8 +185,8 @@ public class MockFlowFile implements FlowFile {
* Asserts that the content of this FlowFile is the same as the content of
* the given file
*
- * @param file
- * @throws IOException
+ * @param file to compare content against
+ * @throws IOException if fails doing IO during comparison
*/
public void assertContentEquals(final File file) throws IOException {
assertContentEquals(file.toPath());
@@ -196,8 +196,8 @@ public class MockFlowFile implements FlowFile {
* Asserts that the content of this FlowFile is the same as the content of
* the given path
*
- * @param path
- * @throws IOException
+ * @param path where to find content to compare to
+ * @throws IOException if io error occurs while comparing content
*/
public void assertContentEquals(final Path path) throws IOException {
try (final InputStream in = Files.newInputStream(path, StandardOpenOption.READ)) {
@@ -209,8 +209,8 @@ public class MockFlowFile implements FlowFile {
* Asserts that the content of this FlowFile is the same as the content of
* the given byte array
*
- * @param data
- * @throws IOException
+ * @param data the data to compare
+ * @throws IOException if any ioe occurs while reading flowfile
*/
public void assertContentEquals(final byte[] data) throws IOException {
try (final InputStream in = new ByteArrayInputStream(data)) {
@@ -236,8 +236,8 @@ public class MockFlowFile implements FlowFile {
* the given InputStream. This method closes the InputStream when it is
* finished.
*
- * @param in
- * @throws IOException
+ * @param in the stream to source comparison data from
+ * @throws IOException if any issues reading from given source
*/
public void assertContentEquals(final InputStream in) throws IOException {
int bytesRead = 0;
@@ -264,9 +264,7 @@ public class MockFlowFile implements FlowFile {
}
/**
- * Returns a copy of the the contents of the FlowFile as a byte array
- *
- * @return
+ * @return a copy of the the contents of the FlowFile as a byte array
*/
public byte[] toByteArray() {
return Arrays.copyOf(this.data, this.data.length);
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessContext.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessContext.java b/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessContext.java
index 6536928..20a2f7c 100644
--- a/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessContext.java
+++ b/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessContext.java
@@ -54,7 +54,7 @@ public class MockProcessContext extends MockControllerServiceLookup implements S
/**
* Creates a new MockProcessContext for the given Processor
*
- * @param component
+ * @param component being mocked
*/
public MockProcessContext(final ConfigurableComponent component) {
this.component = Objects.requireNonNull(component);
@@ -73,7 +73,7 @@ public class MockProcessContext extends MockControllerServiceLookup implements S
// do nothing...the service is being loaded
}
}
-
+
@Override
public PropertyValue getProperty(final PropertyDescriptor descriptor) {
return getProperty(descriptor.getName());
@@ -107,9 +107,9 @@ public class MockProcessContext extends MockControllerServiceLookup implements S
* either case, the ValidationResult is returned, indicating whether or not
* the property is valid
*
- * @param descriptor
- * @param value
- * @return
+ * @param descriptor of property to modify
+ * @param value new value
+ * @return result
*/
public ValidationResult setProperty(final PropertyDescriptor descriptor, final String value) {
requireNonNull(descriptor);
@@ -154,7 +154,6 @@ public class MockProcessContext extends MockControllerServiceLookup implements S
config.setProperties(properties);
config.setAnnotationData(annotationData);
}
-
@Override
public int getMaxConcurrentTasks() {
@@ -268,10 +267,10 @@ public class MockProcessContext extends MockControllerServiceLookup implements S
}
public Set<Relationship> getAvailableRelationships() {
- if ( !(component instanceof Processor) ) {
+ if (!(component instanceof Processor)) {
return Collections.emptySet();
}
-
+
final Set<Relationship> relationships = new HashSet<>(((Processor) component).getRelationships());
relationships.removeAll(unavailableRelationships);
return relationships;
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java b/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java
index 83c75c6..e9bb778 100644
--- a/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java
+++ b/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java
@@ -329,7 +329,6 @@ public class MockProcessSession implements ProcessSession {
return newFlowFile;
}
-
@Override
public MockFlowFile merge(final Collection<FlowFile> sources, final FlowFile destination) {
for (final FlowFile source : sources) {
@@ -676,11 +675,9 @@ public class MockProcessSession implements ProcessSession {
}
/**
- * Returns a List of FlowFiles in the order in which they were transferred
+ * @param relationship to get flowfiles for
+ * @return a List of FlowFiles in the order in which they were transferred
* to the given relationship
- *
- * @param relationship
- * @return
*/
public List<MockFlowFile> getFlowFilesForRelationship(final String relationship) {
final Relationship procRel = new Relationship.Builder().name(relationship).build();
@@ -783,7 +780,7 @@ public class MockProcessSession implements ProcessSession {
* will capture the uuid of a certain number of source objects and may not
* capture all of them. How many it will capture is unspecified.
*
- * @param sources
+ * @param sources to inherit common attributes from
*/
private FlowFile inheritAttributes(final Collection<FlowFile> sources, final FlowFile destination) {
final StringBuilder parentUuidBuilder = new StringBuilder();
@@ -883,8 +880,8 @@ public class MockProcessSession implements ProcessSession {
* Assert that the number of FlowFiles transferred to the given relationship
* is equal to the given count
*
- * @param relationship
- * @param count
+ * @param relationship to validate transfer count of
+ * @param count items transfer to given relationship
*/
public void assertTransferCount(final Relationship relationship, final int count) {
final int transferCount = getFlowFilesForRelationship(relationship).size();
@@ -896,8 +893,8 @@ public class MockProcessSession implements ProcessSession {
* Assert that the number of FlowFiles transferred to the given relationship
* is equal to the given count
*
- * @param relationship
- * @param count
+ * @param relationship to validate transfer count of
+ * @param count items transfer to given relationship
*/
public void assertTransferCount(final String relationship, final int count) {
assertTransferCount(new Relationship.Builder().name(relationship).build(), count);
@@ -921,7 +918,7 @@ public class MockProcessSession implements ProcessSession {
* Asserts that all FlowFiles that were transferred were transferred to the
* given relationship
*
- * @param relationship
+ * @param relationship to check for transferred flow files
*/
public void assertAllFlowFilesTransferred(final String relationship) {
assertAllFlowFilesTransferred(new Relationship.Builder().name(relationship).build());
@@ -931,7 +928,7 @@ public class MockProcessSession implements ProcessSession {
* Asserts that all FlowFiles that were transferred were transferred to the
* given relationship
*
- * @param relationship
+ * @param relationship to validate
*/
public void assertAllFlowFilesTransferred(final Relationship relationship) {
for (final Map.Entry<Relationship, List<MockFlowFile>> entry : transferMap.entrySet()) {
@@ -956,8 +953,8 @@ public class MockProcessSession implements ProcessSession {
* given relationship and that the number of FlowFiles transferred is equal
* to <code>count</code>
*
- * @param relationship
- * @param count
+ * @param relationship to validate
+ * @param count number of items sent to that relationship (expected)
*/
public void assertAllFlowFilesTransferred(final Relationship relationship, final int count) {
assertAllFlowFilesTransferred(relationship);
@@ -969,17 +966,15 @@ public class MockProcessSession implements ProcessSession {
* given relationship and that the number of FlowFiles transferred is equal
* to <code>count</code>
*
- * @param relationship
- * @param count
+ * @param relationship to validate
+ * @param count number of items sent to that relationship (expected)
*/
public void assertAllFlowFilesTransferred(final String relationship, final int count) {
assertAllFlowFilesTransferred(new Relationship.Builder().name(relationship).build(), count);
}
/**
- * Returns the number of FlowFiles that were removed
- *
- * @return
+ * @return the number of FlowFiles that were removed
*/
public int getRemovedCount() {
return removedCount;
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessorInitializationContext.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessorInitializationContext.java b/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessorInitializationContext.java
index 0aa2749..2e5d3eb 100644
--- a/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessorInitializationContext.java
+++ b/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessorInitializationContext.java
@@ -64,9 +64,9 @@ public class MockProcessorInitializationContext implements ProcessorInitializati
@Override
public String getControllerServiceName(String serviceIdentifier) {
- return context.getControllerServiceName(serviceIdentifier);
+ return context.getControllerServiceName(serviceIdentifier);
}
-
+
@Override
public boolean isControllerServiceEnabled(String serviceIdentifier) {
return context.isControllerServiceEnabled(serviceIdentifier);
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessorLog.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessorLog.java b/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessorLog.java
index 5505e88..837784b 100644
--- a/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessorLog.java
+++ b/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessorLog.java
@@ -66,21 +66,11 @@ public class MockProcessorLog implements ProcessorLog {
return (os != null && os.length > 0 && (os[os.length - 1] instanceof Throwable));
}
- /**
- *
- * @param msg
- * @param t
- */
@Override
public void warn(final String msg, final Throwable t) {
warn("{} " + msg, new Object[]{component}, t);
}
- /**
- *
- * @param msg
- * @param os
- */
@Override
public void warn(String msg, Object[] os) {
if (lastArgIsException(os)) {
@@ -92,12 +82,6 @@ public class MockProcessorLog implements ProcessorLog {
}
}
- /**
- *
- * @param msg
- * @param os
- * @param t
- */
@Override
public void warn(String msg, Object[] os, final Throwable t) {
os = addProcessorAndThrowable(os, t);
@@ -109,21 +93,12 @@ public class MockProcessorLog implements ProcessorLog {
}
}
- /**
- *
- * @param msg
- */
@Override
public void warn(String msg) {
msg = "{} " + msg;
logger.warn(msg, component);
}
- /**
- *
- * @param msg
- * @param t
- */
@Override
public void trace(String msg, Throwable t) {
msg = "{} " + msg;
@@ -131,11 +106,6 @@ public class MockProcessorLog implements ProcessorLog {
logger.trace(msg, os, t);
}
- /**
- *
- * @param msg
- * @param os
- */
@Override
public void trace(String msg, Object[] os) {
msg = "{} " + msg;
@@ -143,10 +113,6 @@ public class MockProcessorLog implements ProcessorLog {
logger.trace(msg, os);
}
- /**
- *
- * @param msg
- */
@Override
public void trace(String msg) {
msg = "{} " + msg;
@@ -154,12 +120,6 @@ public class MockProcessorLog implements ProcessorLog {
logger.trace(msg, os);
}
- /**
- *
- * @param msg
- * @param os
- * @param t
- */
@Override
public void trace(String msg, Object[] os, Throwable t) {
os = addProcessorAndThrowable(os, t);
@@ -169,56 +129,31 @@ public class MockProcessorLog implements ProcessorLog {
logger.trace("", t);
}
- /**
- *
- * @return
- */
@Override
public boolean isWarnEnabled() {
return logger.isWarnEnabled();
}
- /**
- *
- * @return
- */
@Override
public boolean isTraceEnabled() {
return logger.isTraceEnabled();
}
- /**
- *
- * @return
- */
@Override
public boolean isInfoEnabled() {
return logger.isInfoEnabled();
}
- /**
- *
- * @return
- */
@Override
public boolean isErrorEnabled() {
return logger.isErrorEnabled();
}
- /**
- *
- * @return
- */
@Override
public boolean isDebugEnabled() {
return logger.isDebugEnabled();
}
- /**
- *
- * @param msg
- * @param t
- */
@Override
public void info(String msg, Throwable t) {
msg = "{} " + msg;
@@ -230,11 +165,6 @@ public class MockProcessorLog implements ProcessorLog {
}
}
- /**
- *
- * @param msg
- * @param os
- */
@Override
public void info(String msg, Object[] os) {
msg = "{} " + msg;
@@ -243,10 +173,6 @@ public class MockProcessorLog implements ProcessorLog {
logger.info(msg, os);
}
- /**
- *
- * @param msg
- */
@Override
public void info(String msg) {
msg = "{} " + msg;
@@ -255,12 +181,6 @@ public class MockProcessorLog implements ProcessorLog {
logger.info(msg, os);
}
- /**
- *
- * @param msg
- * @param os
- * @param t
- */
@Override
public void info(String msg, Object[] os, Throwable t) {
os = addProcessorAndThrowable(os, t);
@@ -272,20 +192,11 @@ public class MockProcessorLog implements ProcessorLog {
}
}
- /**
- *
- * @return
- */
@Override
public String getName() {
return logger.getName();
}
- /**
- *
- * @param msg
- * @param t
- */
@Override
public void error(String msg, Throwable t) {
msg = "{} " + msg;
@@ -297,11 +208,6 @@ public class MockProcessorLog implements ProcessorLog {
}
}
- /**
- *
- * @param msg
- * @param os
- */
@Override
public void error(String msg, Object[] os) {
if (lastArgIsException(os)) {
@@ -313,10 +219,6 @@ public class MockProcessorLog implements ProcessorLog {
}
}
- /**
- *
- * @param msg
- */
@Override
public void error(String msg) {
msg = "{} " + msg;
@@ -325,12 +227,6 @@ public class MockProcessorLog implements ProcessorLog {
logger.error(msg, os);
}
- /**
- *
- * @param msg
- * @param os
- * @param t
- */
@Override
public void error(String msg, Object[] os, Throwable t) {
os = addProcessorAndThrowable(os, t);
@@ -342,11 +238,6 @@ public class MockProcessorLog implements ProcessorLog {
}
}
- /**
- *
- * @param msg
- * @param t
- */
@Override
public void debug(String msg, Throwable t) {
msg = "{} " + msg;
@@ -355,11 +246,6 @@ public class MockProcessorLog implements ProcessorLog {
logger.debug(msg, os, t);
}
- /**
- *
- * @param msg
- * @param os
- */
@Override
public void debug(String msg, Object[] os) {
os = addProcessor(os);
@@ -368,12 +254,6 @@ public class MockProcessorLog implements ProcessorLog {
logger.debug(msg, os);
}
- /**
- *
- * @param msg
- * @param os
- * @param t
- */
@Override
public void debug(String msg, Object[] os, Throwable t) {
os = addProcessorAndThrowable(os, t);
@@ -385,10 +265,6 @@ public class MockProcessorLog implements ProcessorLog {
}
}
- /**
- *
- * @param msg
- */
@Override
public void debug(String msg) {
msg = "{} " + msg;
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockProvenanceReporter.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockProvenanceReporter.java b/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockProvenanceReporter.java
index 3451f12..097eafd 100644
--- a/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockProvenanceReporter.java
+++ b/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockProvenanceReporter.java
@@ -88,7 +88,7 @@ public class MockProvenanceReporter implements ProvenanceReporter {
public void fork(FlowFile parent, java.util.Collection<FlowFile> children, String details, long forkDuration) {
}
-
+
@Override
public void join(Collection<FlowFile> parents, FlowFile child) {
@@ -108,7 +108,7 @@ public class MockProvenanceReporter implements ProvenanceReporter {
public void join(java.util.Collection<FlowFile> parents, FlowFile child, String details, long joinDuration) {
}
-
+
@Override
public void clone(FlowFile parent, FlowFile child) {
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockReportingContext.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockReportingContext.java b/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockReportingContext.java
index ca4350c..63a9876 100644
--- a/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockReportingContext.java
+++ b/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockReportingContext.java
@@ -100,11 +100,9 @@ public class MockReportingContext extends MockControllerServiceLookup implements
}
/**
- * Returns all Bulletins that have been created for the component with the
+ * @param componentId identifier of component to get bulletins for
+ * @return all Bulletins that have been created for the component with the
* given ID
- *
- * @param componentId
- * @return
*/
public List<Bulletin> getComponentBulletins(final String componentId) {
final List<Bulletin> created = componentBulletinsCreated.get(componentId);
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockReportingInitializationContext.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockReportingInitializationContext.java b/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockReportingInitializationContext.java
index 7cabef2..0aea00a 100644
--- a/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockReportingInitializationContext.java
+++ b/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockReportingInitializationContext.java
@@ -81,7 +81,7 @@ public class MockReportingInitializationContext extends MockControllerServiceLoo
public SchedulingStrategy getSchedulingStrategy() {
return SchedulingStrategy.TIMER_DRIVEN;
}
-
+
@Override
public ComponentLog getLogger() {
return logger;
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockValidationContext.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockValidationContext.java b/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockValidationContext.java
index c9b1cda..d73a09b 100644
--- a/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockValidationContext.java
+++ b/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockValidationContext.java
@@ -38,10 +38,10 @@ public class MockValidationContext implements ValidationContext, ControllerServi
public MockValidationContext(final MockProcessContext processContext) {
this.context = processContext;
-
+
final Map<PropertyDescriptor, String> properties = processContext.getProperties();
expressionLanguageSupported = new HashMap<>(properties.size());
- for ( final PropertyDescriptor descriptor : properties.keySet() ) {
+ for (final PropertyDescriptor descriptor : properties.keySet()) {
expressionLanguageSupported.put(descriptor.getName(), descriptor.isExpressionLanguageSupported());
}
}
@@ -101,13 +101,13 @@ public class MockValidationContext implements ValidationContext, ControllerServi
public boolean isControllerServiceEnabled(final ControllerService service) {
return context.isControllerServiceEnabled(service);
}
-
+
@Override
public String getControllerServiceName(final String serviceIdentifier) {
- final ControllerServiceConfiguration configuration = context.getConfiguration(serviceIdentifier);
- return configuration == null ? null : serviceIdentifier;
+ final ControllerServiceConfiguration configuration = context.getConfiguration(serviceIdentifier);
+ return configuration == null ? null : serviceIdentifier;
}
-
+
@Override
public boolean isValidationRequired(final ControllerService service) {
return true;
@@ -117,16 +117,16 @@ public class MockValidationContext implements ValidationContext, ControllerServi
public boolean isControllerServiceEnabling(String serviceIdentifier) {
return context.isControllerServiceEnabling(serviceIdentifier);
}
-
+
public boolean isExpressionLanguagePresent(final String value) {
- if ( value == null ) {
+ if (value == null) {
return false;
}
-
+
final List<Range> elRanges = Query.extractExpressionRanges(value);
return (elRanges != null && !elRanges.isEmpty());
}
-
+
@Override
public boolean isExpressionLanguageSupported(final String propertyName) {
final Boolean supported = expressionLanguageSupported.get(propertyName);
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-mock/src/main/java/org/apache/nifi/util/ReflectionUtils.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-mock/src/main/java/org/apache/nifi/util/ReflectionUtils.java b/nifi/nifi-mock/src/main/java/org/apache/nifi/util/ReflectionUtils.java
index 9d52eb3..940eeea 100644
--- a/nifi/nifi-mock/src/main/java/org/apache/nifi/util/ReflectionUtils.java
+++ b/nifi/nifi-mock/src/main/java/org/apache/nifi/util/ReflectionUtils.java
@@ -34,14 +34,15 @@ public class ReflectionUtils {
* are supplied by the <code>args</code> parameter than needed, the extra
* arguments will be ignored.
*
- * @param annotation
- * @param instance
- * @param args
- * @throws InvocationTargetException
- * @throws IllegalArgumentException
- * @throws IllegalAccessException
+ * @param annotation the annotation to look for
+ * @param instance to invoke a method of
+ * @param args to supply in a method call
+ * @throws InvocationTargetException ite
+ * @throws IllegalArgumentException iae
+ * @throws IllegalAccessException if not allowed to invoke that method
*/
- public static void invokeMethodsWithAnnotation(final Class<? extends Annotation> annotation, final Object instance, final Object... args) throws IllegalAccessException, IllegalArgumentException, InvocationTargetException {
+ public static void invokeMethodsWithAnnotation(final Class<? extends Annotation> annotation, final Object instance, final Object... args)
+ throws IllegalAccessException, IllegalArgumentException, InvocationTargetException {
for (final Method method : instance.getClass().getMethods()) {
if (method.isAnnotationPresent(annotation)) {
final boolean isAccessible = method.isAccessible();
@@ -90,9 +91,9 @@ public class ReflectionUtils {
* are supplied by the <code>args</code> parameter than needed, the extra
* arguments will be ignored.
*
- * @param annotation
- * @param instance
- * @param args
+ * @param annotation the annotation to look for
+ * @param instance to invoke a method of
+ * @param args to supply in a method call
* @return <code>true</code> if all appropriate methods were invoked and
* returned without throwing an Exception, <code>false</code> if one of the
* methods threw an Exception or could not be invoked; if <code>false</code>
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-mock/src/main/java/org/apache/nifi/util/SharedSessionState.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-mock/src/main/java/org/apache/nifi/util/SharedSessionState.java b/nifi/nifi-mock/src/main/java/org/apache/nifi/util/SharedSessionState.java
index 13a87de..65d79a6 100644
--- a/nifi/nifi-mock/src/main/java/org/apache/nifi/util/SharedSessionState.java
+++ b/nifi/nifi-mock/src/main/java/org/apache/nifi/util/SharedSessionState.java
@@ -32,7 +32,6 @@ public class SharedSessionState {
private final AtomicLong flowFileIdGenerator;
private final ConcurrentMap<String, AtomicLong> counterMap = new ConcurrentHashMap<>();
-
public SharedSessionState(final Processor processor, final AtomicLong flowFileIdGenerator) {
flowFileQueue = new MockFlowFileQueue();
provenanceReporter = new MockProvenanceReporter();
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java b/nifi/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java
index d66ed81..7048cfe 100644
--- a/nifi/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java
+++ b/nifi/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java
@@ -86,14 +86,14 @@ public class StandardProcessorTestRunner implements TestRunner {
private static final Logger logger = LoggerFactory.getLogger(StandardProcessorTestRunner.class);
private static final Set<Class<? extends Annotation>> deprecatedTypeAnnotations = new HashSet<>();
private static final Set<Class<? extends Annotation>> deprecatedMethodAnnotations = new HashSet<>();
-
+
static {
// do this in a separate method, just so that we can add a @SuppressWarnings annotation
// because we want to indicate explicitly that we know that we are using deprecated
// classes here.
populateDeprecatedMethods();
}
-
+
StandardProcessorTestRunner(final Processor processor) {
this.processor = processor;
this.idGenerator = new AtomicLong(0L);
@@ -103,7 +103,7 @@ public class StandardProcessorTestRunner implements TestRunner {
this.context = new MockProcessContext(processor);
detectDeprecatedAnnotations(processor);
-
+
final MockProcessorInitializationContext mockInitContext = new MockProcessorInitializationContext(processor, context);
processor.initialize(mockInitContext);
@@ -126,7 +126,7 @@ public class StandardProcessorTestRunner implements TestRunner {
deprecatedTypeAnnotations.add(org.apache.nifi.processor.annotation.TriggerWhenEmpty.class);
deprecatedTypeAnnotations.add(org.apache.nifi.processor.annotation.TriggerWhenAnyDestinationAvailable.class);
deprecatedTypeAnnotations.add(org.apache.nifi.processor.annotation.TriggerSerially.class);
-
+
deprecatedMethodAnnotations.add(org.apache.nifi.processor.annotation.OnRemoved.class);
deprecatedMethodAnnotations.add(org.apache.nifi.processor.annotation.OnAdded.class);
deprecatedMethodAnnotations.add(org.apache.nifi.processor.annotation.OnScheduled.class);
@@ -134,24 +134,24 @@ public class StandardProcessorTestRunner implements TestRunner {
deprecatedMethodAnnotations.add(org.apache.nifi.processor.annotation.OnStopped.class);
deprecatedMethodAnnotations.add(org.apache.nifi.processor.annotation.OnUnscheduled.class);
}
-
+
private static void detectDeprecatedAnnotations(final Processor processor) {
- for ( final Class<? extends Annotation> annotationClass : deprecatedTypeAnnotations ) {
- if ( processor.getClass().isAnnotationPresent(annotationClass) ) {
+ for (final Class<? extends Annotation> annotationClass : deprecatedTypeAnnotations) {
+ if (processor.getClass().isAnnotationPresent(annotationClass)) {
Assert.fail("Processor is using deprecated Annotation " + annotationClass.getCanonicalName());
}
}
-
- for ( final Class<? extends Annotation> annotationClass : deprecatedMethodAnnotations ) {
- for ( final Method method : processor.getClass().getMethods() ) {
- if ( method.isAnnotationPresent(annotationClass) ) {
+
+ for (final Class<? extends Annotation> annotationClass : deprecatedMethodAnnotations) {
+ for (final Method method : processor.getClass().getMethods()) {
+ if (method.isAnnotationPresent(annotationClass)) {
Assert.fail("Processor is using deprecated Annotation " + annotationClass.getCanonicalName() + " for method " + method);
}
}
}
-
+
}
-
+
@Override
public void setValidateExpressionUsage(final boolean validate) {
context.setValidateExpressionUsage(validate);
@@ -181,7 +181,7 @@ public class StandardProcessorTestRunner implements TestRunner {
public void run(final int iterations, final boolean stopOnFinish) {
run(iterations, stopOnFinish, true);
}
-
+
@Override
public void run(final int iterations, final boolean stopOnFinish, final boolean initialize) {
if (iterations < 1) {
@@ -191,7 +191,7 @@ public class StandardProcessorTestRunner implements TestRunner {
context.assertValid();
context.enableExpressionValidation();
try {
- if ( initialize ) {
+ if (initialize) {
try {
ReflectionUtils.invokeMethodsWithAnnotation(OnScheduled.class, processor, context);
} catch (Exception e) {
@@ -519,7 +519,7 @@ public class StandardProcessorTestRunner implements TestRunner {
// Assert.fail("Controller Service " + service + " is using deprecated Annotation " + org.apache.nifi.controller.annotation.OnConfigured.class + " for method " + method);
// }
// }
-
+
final ComponentLog logger = new MockProcessorLog(identifier, service);
final MockControllerServiceInitializationContext initContext = new MockControllerServiceInitializationContext(requireNonNull(service), requireNonNull(identifier), logger);
service.initialize(initContext);
@@ -538,66 +538,64 @@ public class StandardProcessorTestRunner implements TestRunner {
context.addControllerService(identifier, service, resolvedProps, null);
}
-
@Override
public void assertNotValid(final ControllerService service) {
final ValidationContext validationContext = new MockValidationContext(context).getControllerServiceValidationContext(service);
final Collection<ValidationResult> results = context.getControllerService(service.getIdentifier()).validate(validationContext);
-
- for ( final ValidationResult result : results ) {
- if ( !result.isValid() ) {
+
+ for (final ValidationResult result : results) {
+ if (!result.isValid()) {
return;
}
}
-
+
Assert.fail("Expected Controller Service " + service + " to be invalid but it is valid");
}
-
+
@Override
public void assertValid(final ControllerService service) {
final ValidationContext validationContext = new MockValidationContext(context).getControllerServiceValidationContext(service);
final Collection<ValidationResult> results = context.getControllerService(service.getIdentifier()).validate(validationContext);
-
- for ( final ValidationResult result : results ) {
- if ( !result.isValid() ) {
+
+ for (final ValidationResult result : results) {
+ if (!result.isValid()) {
Assert.fail("Expected Controller Service to be valid but it is invalid due to: " + result.toString());
}
}
}
-
-
+
@Override
public void disableControllerService(final ControllerService service) {
final ControllerServiceConfiguration configuration = context.getConfiguration(service.getIdentifier());
- if ( configuration == null ) {
+ if (configuration == null) {
throw new IllegalArgumentException("Controller Service " + service + " is not known");
}
-
- if ( !configuration.isEnabled() ) {
+
+ if (!configuration.isEnabled()) {
throw new IllegalStateException("Controller service " + service + " cannot be disabled because it is not enabled");
}
-
+
try {
ReflectionUtils.invokeMethodsWithAnnotation(OnDisabled.class, service);
} catch (final Exception e) {
e.printStackTrace();
Assert.fail("Failed to disable Controller Service " + service + " due to " + e);
}
-
+
configuration.setEnabled(false);
}
-
+
@Override
public void enableControllerService(final ControllerService service) {
final ControllerServiceConfiguration configuration = context.getConfiguration(service.getIdentifier());
- if ( configuration == null ) {
+ if (configuration == null) {
throw new IllegalArgumentException("Controller Service " + service + " is not known");
}
-
- if ( configuration.isEnabled() ) {
+
+ if (configuration.isEnabled()) {
throw new IllegalStateException("Cannot enable Controller Service " + service + " because it is not disabled");
}
-
+
try {
final ConfigurationContext configContext = new MockConfigurationContext(configuration.getProperties(), context);
ReflectionUtils.invokeMethodsWithAnnotation(OnEnabled.class, service, configContext);
@@ -609,87 +607,86 @@ public class StandardProcessorTestRunner implements TestRunner {
Assert.fail("Failed to enable Controller Service " + service + " due to " + e);
}
- configuration.setEnabled(true);
+ configuration.setEnabled(true);
}
-
+
@Override
public boolean isControllerServiceEnabled(final ControllerService service) {
final ControllerServiceConfiguration configuration = context.getConfiguration(service.getIdentifier());
- if ( configuration == null ) {
+ if (configuration == null) {
throw new IllegalArgumentException("Controller Service " + service + " is not known");
}
return configuration.isEnabled();
}
-
+
@Override
public void removeControllerService(final ControllerService service) {
disableControllerService(service);
-
+
try {
ReflectionUtils.invokeMethodsWithAnnotation(OnRemoved.class, service);
} catch (final Exception e) {
e.printStackTrace();
Assert.fail("Failed to remove Controller Service " + service + " due to " + e);
}
-
+
context.removeControllerService(service);
}
-
+
@Override
public void setAnnotationData(final ControllerService service, final String annotationData) {
final ControllerServiceConfiguration configuration = getConfigToUpdate(service);
configuration.setAnnotationData(annotationData);
}
-
+
private ControllerServiceConfiguration getConfigToUpdate(final ControllerService service) {
final ControllerServiceConfiguration configuration = context.getConfiguration(service.getIdentifier());
- if ( configuration == null ) {
+ if (configuration == null) {
throw new IllegalArgumentException("Controller Service " + service + " is not known");
}
-
- if ( configuration.isEnabled() ) {
+
+ if (configuration.isEnabled()) {
throw new IllegalStateException("Controller service " + service + " cannot be modified because it is not disabled");
}
-
+
return configuration;
}
-
+
@Override
public ValidationResult setProperty(final ControllerService service, final PropertyDescriptor property, final AllowableValue value) {
return setProperty(service, property, value.getValue());
}
-
+
@Override
public ValidationResult setProperty(final ControllerService service, final PropertyDescriptor property, final String value) {
final ControllerServiceConfiguration configuration = getConfigToUpdate(service);
final Map<PropertyDescriptor, String> curProps = configuration.getProperties();
final Map<PropertyDescriptor, String> updatedProps = new HashMap<>(curProps);
-
+
final ValidationContext validationContext = new MockValidationContext(context).getControllerServiceValidationContext(service);
final ValidationResult validationResult = property.validate(value, validationContext);
-
+
updatedProps.put(property, value);
configuration.setProperties(updatedProps);
-
+
return validationResult;
}
-
+
@Override
public ValidationResult setProperty(final ControllerService service, final String propertyName, final String value) {
final PropertyDescriptor descriptor = service.getPropertyDescriptor(propertyName);
- if ( descriptor == null ) {
+ if (descriptor == null) {
return new ValidationResult.Builder()
- .input(propertyName)
- .explanation(propertyName + " is not a known Property for Controller Service " + service)
- .subject("Invalid property")
- .valid(false)
- .build();
+ .input(propertyName)
+ .explanation(propertyName + " is not a known Property for Controller Service " + service)
+ .subject("Invalid property")
+ .valid(false)
+ .build();
}
return setProperty(service, descriptor, value);
}
-
-
+
@Override
public ControllerService getControllerService(final String identifier) {
return context.getControllerService(identifier);