You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2015/04/22 17:46:47 UTC
[04/11] incubator-nifi git commit: NIFI-271 checkpoint
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileContentRepository.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileContentRepository.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileContentRepository.java
index 8deda3c..65756f4 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileContentRepository.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileContentRepository.java
@@ -103,7 +103,7 @@ public class VolatileContentRepository implements ContentRepository {
private final ConcurrentMap<ContentClaim, ContentClaim> backupRepoClaimMap = new ConcurrentHashMap<>(256);
private final AtomicReference<ContentRepository> backupRepositoryRef = new AtomicReference<>(null);
- private ContentClaimManager claimManager; // effectively final
+ private ContentClaimManager claimManager; // effectively final
public VolatileContentRepository() {
this(NiFiProperties.getInstance());
@@ -137,7 +137,7 @@ public class VolatileContentRepository implements ContentRepository {
public void initialize(final ContentClaimManager claimManager) {
this.claimManager = claimManager;
}
-
+
@Override
public void shutdown() {
executor.shutdown();
@@ -147,7 +147,7 @@ public class VolatileContentRepository implements ContentRepository {
* Specifies a Backup Repository where data should be written if this
* Repository fills up
*
- * @param backup
+ * @param backup repo backup
*/
public void setBackupRepository(final ContentRepository backup) {
final boolean updated = backupRepositoryRef.compareAndSet(null, backup);
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileFlowFileRepository.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileFlowFileRepository.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileFlowFileRepository.java
index a7020a6..9e429d6 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileFlowFileRepository.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileFlowFileRepository.java
@@ -34,7 +34,7 @@ import org.apache.nifi.controller.repository.claim.ContentClaimManager;
public class VolatileFlowFileRepository implements FlowFileRepository {
private final AtomicLong idGenerator = new AtomicLong(0L);
- private ContentClaimManager claimManager; // effectively final
+ private ContentClaimManager claimManager; // effectively final
@Override
public void initialize(final ContentClaimManager claimManager) {
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
index 292c258..0779c4d 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
@@ -102,7 +102,7 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
// synced with disk.
//
// This is required due to the following scenario, which could exist if we did not do this:
- //
+ //
// A Processor modifies a FlowFile (whose content is in ContentClaim A), writing the new content to ContentClaim B.
// The processor removes ContentClaim A, which deletes the backing file.
// The FlowFile Repository writes out this change but has not yet synced the update to disk.
@@ -112,12 +112,12 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
// ContentClaim A does not exist anymore because the Session Commit destroyed the data.
// This results in Data Loss!
// However, the comment in the class's JavaDocs regarding sync'ing should also be considered.
- //
+ //
// In order to avoid this, instead of destroying ContentClaim A, the ProcessSession puts the claim on the Claim Destruction Queue.
// We periodically force a sync of the FlowFile Repository to the backing storage mechanism.
// We can then destroy the data. If we end up syncing the FlowFile Repository to the backing storage mechanism and then restart
// before the data is destroyed, it's okay because the data will be unknown to the Content Repository, so it will be destroyed
- // on restart.
+ // on restart.
private final ConcurrentMap<Integer, BlockingQueue<ContentClaim>> claimsAwaitingDestruction = new ConcurrentHashMap<>();
public WriteAheadFlowFileRepository() {
@@ -129,7 +129,7 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
flowFileRepositoryPath = properties.getFlowFileRepositoryPath();
numPartitions = properties.getFlowFileRepositoryPartitions();
checkpointDelayMillis = FormatUtils.getTimeDuration(properties.getFlowFileRepositoryCheckpointInterval(), TimeUnit.MILLISECONDS);
-
+
checkpointExecutor = Executors.newSingleThreadScheduledExecutor();
}
@@ -267,9 +267,9 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
* the specified Swap File and returns the number of FlowFiles that were
* persisted.
*
- * @param queue
- * @param swapLocation
- * @throws IOException
+ * @param queue queue to swap out
+ * @param swapLocation location to swap to
+ * @throws IOException ioe
*/
@Override
public void swapFlowFilesOut(final List<FlowFileRecord> swappedOut, final FlowFileQueue queue, final String swapLocation) throws IOException {
@@ -289,14 +289,6 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
logger.info("Successfully swapped out {} FlowFiles from {} to Swap File {}", new Object[]{swappedOut.size(), queue, swapLocation});
}
- /**
- * Swaps FlowFiles into memory space from the given Swap File
- *
- * @param swapLocation
- * @param swapRecords
- * @param queue
- * @throws IOException
- */
@Override
public void swapFlowFilesIn(final String swapLocation, final List<FlowFileRecord> swapRecords, final FlowFileQueue queue) throws IOException {
final List<RepositoryRecord> repoRecords = new ArrayList<>();
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaim.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaim.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaim.java
index c43f3fe..54a1b2c 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaim.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaim.java
@@ -27,7 +27,6 @@ import java.util.concurrent.atomic.AtomicInteger;
* <p>
* Must be thread safe</p>
*
- * @author none
*/
public final class StandardContentClaim implements ContentClaim, Comparable<ContentClaim> {
@@ -38,14 +37,6 @@ public final class StandardContentClaim implements ContentClaim, Comparable<Cont
private final AtomicInteger claimantCount = new AtomicInteger(0);
private final int hashCode;
- /**
- * Constructs a content claim
- *
- * @param container
- * @param section
- * @param id
- * @param lossTolerant
- */
StandardContentClaim(final String container, final String section, final String id, final boolean lossTolerant) {
this.container = container.intern();
this.section = section.intern();
@@ -100,7 +91,7 @@ public final class StandardContentClaim implements ContentClaim, Comparable<Cont
* Provides the natural ordering for ContentClaim objects. By default they
* are sorted by their id, then container, then section
*
- * @param other
+ * @param other other claim
* @return x such that x <=1 if this is less than other;
* x=0 if this.equals(other);
* x >= 1 if this is greater than other
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaimManager.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaimManager.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaimManager.java
index 43bbb5a..b68f95e 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaimManager.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaimManager.java
@@ -34,16 +34,6 @@ public class StandardContentClaimManager implements ContentClaimManager {
private static final BlockingQueue<ContentClaim> destructableClaims = new LinkedBlockingQueue<>(50000);
- /**
- * Creates a new Content Claim with the given id, container, section, and
- * loss tolerance.
- *
- * @param id
- * @param container
- * @param section
- * @param lossTolerant
- * @return
- */
@Override
public ContentClaim newContentClaim(final String container, final String section, final String id, final boolean lossTolerant) {
return new StandardContentClaim(container, section, id, lossTolerant);
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/io/ByteCountingOutputStream.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/io/ByteCountingOutputStream.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/io/ByteCountingOutputStream.java
index f7da136..4e727e9 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/io/ByteCountingOutputStream.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/io/ByteCountingOutputStream.java
@@ -40,9 +40,7 @@ public class ByteCountingOutputStream extends OutputStream {
write(b, 0, b.length);
}
- ;
-
- @Override
+ @Override
public void write(byte[] b, int off, int len) throws IOException {
out.write(b, off, len);
bytesWrittenHolder.increment(len);
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/io/FlowFileAccessInputStream.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/io/FlowFileAccessInputStream.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/io/FlowFileAccessInputStream.java
index f349887..a710070 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/io/FlowFileAccessInputStream.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/io/FlowFileAccessInputStream.java
@@ -56,10 +56,8 @@ public class FlowFileAccessInputStream extends FilterInputStream {
}
/**
- * Returns the ContentNotFoundException that was thrown by this stream, or
- * <code>null</code> if no such Exception was thrown.
- *
- * @return
+ * @return the ContentNotFoundException that was thrown by this stream, or
+ * <code>null</code> if no such Exception was thrown
*/
public ContentNotFoundException getContentNotFoundException() {
return thrown;
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/ConnectableProcessContext.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/ConnectableProcessContext.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/ConnectableProcessContext.java
index acb3a01..01285b0 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/ConnectableProcessContext.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/ConnectableProcessContext.java
@@ -179,14 +179,14 @@ public class ConnectableProcessContext implements ProcessContext {
@Override
public Set<Relationship> getAvailableRelationships() {
- for ( final Connection connection : connectable.getConnections() ) {
- if ( connection.getFlowFileQueue().isFull() ) {
+ for (final Connection connection : connectable.getConnections()) {
+ if (connection.getFlowFileQueue().isFull()) {
return Collections.emptySet();
}
}
-
+
final Collection<Relationship> relationships = connectable.getRelationships();
- if ( relationships instanceof Set ) {
+ if (relationships instanceof Set) {
return (Set<Relationship>) relationships;
}
return new HashSet<>(connectable.getRelationships());
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/EventDrivenSchedulingAgent.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/EventDrivenSchedulingAgent.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/EventDrivenSchedulingAgent.java
index 7455bf8..77ae686 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/EventDrivenSchedulingAgent.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/EventDrivenSchedulingAgent.java
@@ -65,7 +65,8 @@ public class EventDrivenSchedulingAgent implements SchedulingAgent {
private final ConcurrentMap<Connectable, AtomicLong> connectionIndexMap = new ConcurrentHashMap<>();
private final ConcurrentMap<Connectable, ScheduleState> scheduleStates = new ConcurrentHashMap<>();
- public EventDrivenSchedulingAgent(final FlowEngine flowEngine, final ControllerServiceProvider flowController, final EventDrivenWorkerQueue workerQueue, final ProcessContextFactory contextFactory, final int maxThreadCount, final StringEncryptor encryptor) {
+ public EventDrivenSchedulingAgent(final FlowEngine flowEngine, final ControllerServiceProvider flowController,
+ final EventDrivenWorkerQueue workerQueue, final ProcessContextFactory contextFactory, final int maxThreadCount, final StringEncryptor encryptor) {
this.flowEngine = flowEngine;
this.controllerServiceProvider = flowController;
this.workerQueue = workerQueue;
@@ -265,15 +266,15 @@ public class EventDrivenSchedulingAgent implements SchedulingAgent {
private void trigger(final Connectable worker, final ScheduleState scheduleState, final ConnectableProcessContext processContext, final ProcessSessionFactory sessionFactory) {
final int newThreadCount = scheduleState.incrementActiveThreadCount();
if (newThreadCount > worker.getMaxConcurrentTasks() && worker.getMaxConcurrentTasks() > 0) {
- // its possible that the worker queue could give us a worker node that is eligible to run based
- // on the number of threads but another thread has already incremented the thread count, result in
- // reaching the maximum number of threads. we won't know this until we atomically increment the thread count
- // on the Schedule State, so we check it here. in this case, we cannot trigger the Processor, as doing so would
- // result in using more than the maximum number of defined threads
- scheduleState.decrementActiveThreadCount();
- return;
+ // its possible that the worker queue could give us a worker node that is eligible to run based
+ // on the number of threads but another thread has already incremented the thread count, result in
+ // reaching the maximum number of threads. we won't know this until we atomically increment the thread count
+ // on the Schedule State, so we check it here. in this case, we cannot trigger the Processor, as doing so would
+ // result in using more than the maximum number of defined threads
+ scheduleState.decrementActiveThreadCount();
+ return;
}
-
+
try {
try (final AutoCloseable ncl = NarCloseable.withNarLoader()) {
worker.onTrigger(processContext, sessionFactory);
@@ -302,18 +303,19 @@ public class EventDrivenSchedulingAgent implements SchedulingAgent {
}
}
- private void trigger(final ProcessorNode worker, final ProcessContext context, final ScheduleState scheduleState, final StandardProcessContext processContext, final ProcessSessionFactory sessionFactory) {
+ private void trigger(final ProcessorNode worker, final ProcessContext context, final ScheduleState scheduleState,
+ final StandardProcessContext processContext, final ProcessSessionFactory sessionFactory) {
final int newThreadCount = scheduleState.incrementActiveThreadCount();
if (newThreadCount > worker.getMaxConcurrentTasks() && worker.getMaxConcurrentTasks() > 0) {
- // its possible that the worker queue could give us a worker node that is eligible to run based
- // on the number of threads but another thread has already incremented the thread count, result in
- // reaching the maximum number of threads. we won't know this until we atomically increment the thread count
- // on the Schedule State, so we check it here. in this case, we cannot trigger the Processor, as doing so would
- // result in using more than the maximum number of defined threads
- scheduleState.decrementActiveThreadCount();
- return;
+ // its possible that the worker queue could give us a worker node that is eligible to run based
+ // on the number of threads but another thread has already incremented the thread count, result in
+ // reaching the maximum number of threads. we won't know this until we atomically increment the thread count
+ // on the Schedule State, so we check it here. in this case, we cannot trigger the Processor, as doing so would
+ // result in using more than the maximum number of defined threads
+ scheduleState.decrementActiveThreadCount();
+ return;
}
-
+
try {
try (final AutoCloseable ncl = NarCloseable.withNarLoader()) {
worker.onTrigger(processContext, sessionFactory);
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/QuartzSchedulingAgent.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/QuartzSchedulingAgent.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/QuartzSchedulingAgent.java
index 3355e73..4278cee 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/QuartzSchedulingAgent.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/QuartzSchedulingAgent.java
@@ -135,7 +135,7 @@ public class QuartzSchedulingAgent implements SchedulingAgent {
final Callable<Boolean> continuallyRunTask;
if (connectable.getConnectableType() == ConnectableType.PROCESSOR) {
final ProcessorNode procNode = (ProcessorNode) connectable;
-
+
final StandardProcessContext standardProcContext = new StandardProcessContext(procNode, flowController, encryptor);
ContinuallyRunProcessorTask runnableTask = new ContinuallyRunProcessorTask(this, procNode, flowController, contextFactory, scheduleState, standardProcContext);
continuallyRunTask = runnableTask;
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/ScheduleState.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/ScheduleState.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/ScheduleState.java
index ff17912..cb7f55f 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/ScheduleState.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/ScheduleState.java
@@ -28,7 +28,7 @@ public class ScheduleState {
private final AtomicInteger activeThreadCount = new AtomicInteger(0);
private final AtomicBoolean scheduled = new AtomicBoolean(false);
- private final Set<ScheduledFuture<?>> futures = new HashSet<ScheduledFuture<?>>();
+ private final Set<ScheduledFuture<?>> futures = new HashSet<>();
private final AtomicBoolean mustCallOnStoppedMethods = new AtomicBoolean(false);
private volatile long lastStopTime = -1;
@@ -78,7 +78,7 @@ public class ScheduleState {
* Establishes the list of relevant futures for this processor. Replaces any
* previously held futures.
*
- * @param newFutures
+ * @param newFutures futures
*/
public synchronized void setFutures(final Collection<ScheduledFuture<?>> newFutures) {
futures.clear();
@@ -89,7 +89,7 @@ public class ScheduleState {
futures.remove(oldFuture);
futures.add(newFuture);
}
-
+
public synchronized Set<ScheduledFuture<?>> getFutures() {
return Collections.unmodifiableSet(futures);
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
index 7725823..bb565cb 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
@@ -147,12 +147,11 @@ public final class StandardProcessScheduler implements ProcessScheduler {
LOG.error("", t);
}
}
-
+
frameworkTaskExecutor.shutdown();
componentLifeCycleThreadPool.shutdown();
}
-
@Override
public void schedule(final ReportingTaskNode taskNode) {
final ScheduleState scheduleState = getScheduleState(requireNonNull(taskNode));
@@ -184,13 +183,13 @@ public final class StandardProcessScheduler implements ProcessScheduler {
try (final NarCloseable x = NarCloseable.withNarLoader()) {
ReflectionUtils.invokeMethodsWithAnnotation(OnConfigured.class, OnScheduled.class, reportingTask, taskNode.getConfigurationContext());
}
-
+
break;
} catch (final Exception e) {
final Throwable cause = (e instanceof InvocationTargetException) ? e.getCause() : e;
final ComponentLog componentLog = new SimpleProcessLogger(reportingTask.getIdentifier(), reportingTask);
componentLog.error("Failed to invoke @OnEnabled method due to {}", cause);
-
+
LOG.error("Failed to invoke the On-Scheduled Lifecycle methods of {} due to {}; administratively yielding this ReportingTask and will attempt to schedule it again after {}",
new Object[]{reportingTask, e.toString(), administrativeYieldDuration}, e);
try {
@@ -208,20 +207,19 @@ public final class StandardProcessScheduler implements ProcessScheduler {
taskNode.setScheduledState(ScheduledState.RUNNING);
}
-
@Override
public void unschedule(final ReportingTaskNode taskNode) {
final ScheduleState scheduleState = getScheduleState(requireNonNull(taskNode));
if (!scheduleState.isScheduled()) {
return;
}
-
+
taskNode.verifyCanStop();
final SchedulingAgent agent = getSchedulingAgent(taskNode.getSchedulingStrategy());
final ReportingTask reportingTask = taskNode.getReportingTask();
scheduleState.setScheduled(false);
taskNode.setScheduledState(ScheduledState.STOPPED);
-
+
final Runnable unscheduleReportingTaskRunnable = new Runnable() {
@SuppressWarnings("deprecation")
@Override
@@ -240,7 +238,7 @@ public final class StandardProcessScheduler implements ProcessScheduler {
LOG.error("Failed to invoke the @OnUnscheduled methods of {} due to {}; administratively yielding this ReportingTask and will attempt to schedule it again after {}",
reportingTask, cause.toString(), administrativeYieldDuration);
LOG.error("", cause);
-
+
try {
Thread.sleep(administrativeYieldMillis);
} catch (final InterruptedException ie) {
@@ -293,32 +291,32 @@ public final class StandardProcessScheduler implements ProcessScheduler {
final StandardProcessContext processContext = new StandardProcessContext(procNode, controllerServiceProvider, encryptor);
final Set<String> serviceIds = new HashSet<>();
- for ( final PropertyDescriptor descriptor : processContext.getProperties().keySet() ) {
+ for (final PropertyDescriptor descriptor : processContext.getProperties().keySet()) {
final Class<? extends ControllerService> serviceDefinition = descriptor.getControllerServiceDefinition();
- if ( serviceDefinition != null ) {
+ if (serviceDefinition != null) {
final String serviceId = processContext.getProperty(descriptor).getValue();
- if ( serviceId != null ) {
- serviceIds.add(serviceId);
+ if (serviceId != null) {
+ serviceIds.add(serviceId);
}
}
}
-
- attemptOnScheduled: while (true) {
+
+ attemptOnScheduled:
+ while (true) {
try {
synchronized (scheduleState) {
- for ( final String serviceId : serviceIds ) {
+ for (final String serviceId : serviceIds) {
final boolean enabled = processContext.isControllerServiceEnabled(serviceId);
- if ( !enabled ) {
+ if (!enabled) {
LOG.debug("Controller Service with ID {} is not yet enabled, so will not start {} yet", serviceId, procNode);
Thread.sleep(administrativeYieldMillis);
continue attemptOnScheduled;
}
}
-
+
// if no longer scheduled to run, then we're finished. This can happen, for example,
- // if the @OnScheduled method throws an Exception and the user stops the processor
+ // if the @OnScheduled method throws an Exception and the user stops the processor
// while we're administratively yielded.
- //
// we also check if the schedule state's last start time is equal to what it was before.
// if not, then means that the processor has been stopped and started again, so we should just
// bail; another thread will be responsible for invoking the @OnScheduled methods.
@@ -363,12 +361,6 @@ public final class StandardProcessScheduler implements ProcessScheduler {
componentLifeCycleThreadPool.execute(startProcRunnable);
}
- /**
- * Used to delay scheduling the given Processor to run until its yield
- * duration expires.
- *
- * @param procNode
- */
@Override
public void yield(final ProcessorNode procNode) {
// This exists in the ProcessScheduler so that the scheduler can take advantage of the fact that
@@ -381,7 +373,7 @@ public final class StandardProcessScheduler implements ProcessScheduler {
// the context. If this Processor has X number of threads, we end up submitting X new tasks while the previous
// X-1 tasks are still running. At this point, another thread could finish and do the same thing, resulting in
// an additional X-1 extra tasks being submitted.
- //
+ //
// As a result, we simply removed this buggy implementation, as it was a very minor performance optimization
// that gave very bad results.
}
@@ -431,24 +423,11 @@ public final class StandardProcessScheduler implements ProcessScheduler {
getSchedulingAgent(worker).onEvent(worker);
}
- /**
- * Returns the number of threads that are currently active for the given
- * <code>Connectable</code>.
- *
- * @return
- */
@Override
public int getActiveThreadCount(final Object scheduled) {
return getScheduleState(scheduled).getActiveThreadCount();
}
- /**
- * Begins scheduling the given port to run.
- *
- * @throws NullPointerException if the Port is null
- * @throws IllegalStateException if the Port is already scheduled to run or
- * has threads running
- */
@Override
public void startPort(final Port port) {
if (!port.isValid()) {
@@ -501,7 +480,7 @@ public final class StandardProcessScheduler implements ProcessScheduler {
if (!state.isScheduled()) {
return;
}
-
+
state.setScheduled(false);
getSchedulingAgent(connectable).unschedule(connectable, state);
@@ -561,7 +540,7 @@ public final class StandardProcessScheduler implements ProcessScheduler {
if (procNode.getScheduledState() != ScheduledState.DISABLED) {
throw new IllegalStateException("Processor cannot be enabled because it is not disabled");
}
-
+
procNode.setScheduledState(ScheduledState.STOPPED);
}
@@ -570,21 +549,22 @@ public final class StandardProcessScheduler implements ProcessScheduler {
if (procNode.getScheduledState() != ScheduledState.STOPPED) {
throw new IllegalStateException("Processor cannot be disabled because its state is set to " + procNode.getScheduledState());
}
-
+
procNode.setScheduledState(ScheduledState.DISABLED);
}
public synchronized void enableReportingTask(final ReportingTaskNode taskNode) {
- if ( taskNode.getScheduledState() != ScheduledState.DISABLED ) {
+ if (taskNode.getScheduledState() != ScheduledState.DISABLED) {
throw new IllegalStateException("Reporting Task cannot be enabled because it is not disabled");
}
taskNode.setScheduledState(ScheduledState.STOPPED);
}
-
+
public synchronized void disableReportingTask(final ReportingTaskNode taskNode) {
- if ( taskNode.getScheduledState() != ScheduledState.STOPPED ) {
- throw new IllegalStateException("Reporting Task cannot be disabled because its state is set to " + taskNode.getScheduledState() + " but transition to DISABLED state is allowed only from the STOPPED state");
+ if (taskNode.getScheduledState() != ScheduledState.STOPPED) {
+ throw new IllegalStateException("Reporting Task cannot be disabled because its state is set to " + taskNode.getScheduledState()
+ + " but transition to DISABLED state is allowed only from the STOPPED state");
}
taskNode.setScheduledState(ScheduledState.DISABLED);
@@ -597,12 +577,12 @@ public final class StandardProcessScheduler implements ProcessScheduler {
}
/**
- * Returns the ScheduleState that is registered for the given component;
- * if no ScheduleState current is registered, one is created and registered
+ * Returns the ScheduleState that is registered for the given component; if
+ * no ScheduleState current is registered, one is created and registered
* atomically, and then that value is returned.
*
- * @param schedulable
- * @return
+ * @param schedulable schedulable
+ * @return scheduled state
*/
private ScheduleState getScheduleState(final Object schedulable) {
ScheduleState scheduleState = scheduleStates.get(schedulable);
@@ -620,21 +600,21 @@ public final class StandardProcessScheduler implements ProcessScheduler {
public void enableControllerService(final ControllerServiceNode service) {
service.setState(ControllerServiceState.ENABLING);
final ScheduleState scheduleState = getScheduleState(service);
-
+
final Runnable enableRunnable = new Runnable() {
@Override
public void run() {
try (final NarCloseable x = NarCloseable.withNarLoader()) {
long lastStopTime = scheduleState.getLastStopTime();
final ConfigurationContext configContext = new StandardConfigurationContext(service, controllerServiceProvider);
-
+
while (true) {
try {
synchronized (scheduleState) {
// if no longer enabled, then we're finished. This can happen, for example,
// if the @OnEnabled method throws an Exception and the user disables the service
// while we're administratively yielded.
- //
+ //
// we also check if the schedule state's last stop time is equal to what it was before.
// if not, then means that the service has been disabled and enabled again, so we should just
// bail; another thread will be responsible for invoking the @OnEnabled methods.
@@ -649,11 +629,11 @@ public final class StandardProcessScheduler implements ProcessScheduler {
}
} catch (final Exception e) {
final Throwable cause = (e instanceof InvocationTargetException) ? e.getCause() : e;
-
+
final ComponentLog componentLog = new SimpleProcessLogger(service.getIdentifier(), service);
componentLog.error("Failed to invoke @OnEnabled method due to {}", cause);
LOG.error("Failed to invoke @OnEnabled method of {} due to {}", service.getControllerServiceImplementation(), cause.toString());
- if ( LOG.isDebugEnabled() ) {
+ if (LOG.isDebugEnabled()) {
LOG.error("", cause);
}
@@ -666,15 +646,15 @@ public final class StandardProcessScheduler implements ProcessScheduler {
final Throwable cause = (t instanceof InvocationTargetException) ? t.getCause() : t;
final ComponentLog componentLog = new SimpleProcessLogger(service.getIdentifier(), service);
componentLog.error("Failed to invoke @OnEnabled method due to {}", cause);
-
+
LOG.error("Failed to invoke @OnEnabled method on {} due to {}", service.getControllerServiceImplementation(), cause.toString());
- if ( LOG.isDebugEnabled() ) {
+ if (LOG.isDebugEnabled()) {
LOG.error("", cause);
}
}
}
};
-
+
scheduleState.setScheduled(true);
componentLifeCycleThreadPool.execute(enableRunnable);
}
@@ -682,7 +662,7 @@ public final class StandardProcessScheduler implements ProcessScheduler {
@Override
public void disableControllerService(final ControllerServiceNode service) {
service.verifyCanDisable();
-
+
final ScheduleState state = getScheduleState(requireNonNull(service));
final Runnable disableRunnable = new Runnable() {
@Override
@@ -693,8 +673,8 @@ public final class StandardProcessScheduler implements ProcessScheduler {
try (final NarCloseable x = NarCloseable.withNarLoader()) {
final ConfigurationContext configContext = new StandardConfigurationContext(service, controllerServiceProvider);
-
- while(true) {
+
+ while (true) {
try {
ReflectionUtils.invokeMethodsWithAnnotation(OnDisabled.class, service.getControllerServiceImplementation(), configContext);
heartbeater.heartbeat();
@@ -704,17 +684,18 @@ public final class StandardProcessScheduler implements ProcessScheduler {
final Throwable cause = (e instanceof InvocationTargetException) ? e.getCause() : e;
final ComponentLog componentLog = new SimpleProcessLogger(service.getIdentifier(), service);
componentLog.error("Failed to invoke @OnDisabled method due to {}", cause);
-
+
LOG.error("Failed to invoke @OnDisabled method of {} due to {}", service.getControllerServiceImplementation(), cause.toString());
- if ( LOG.isDebugEnabled() ) {
+ if (LOG.isDebugEnabled()) {
LOG.error("", cause);
}
-
+
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnDisabled.class, service.getControllerServiceImplementation(), configContext);
try {
Thread.sleep(administrativeYieldMillis);
- } catch (final InterruptedException ie) {}
-
+ } catch (final InterruptedException ie) {
+ }
+
continue;
}
}
@@ -723,6 +704,6 @@ public final class StandardProcessScheduler implements ProcessScheduler {
};
service.setState(ControllerServiceState.DISABLING);
- componentLifeCycleThreadPool.execute(disableRunnable);
+ componentLifeCycleThreadPool.execute(disableRunnable);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/TimerDrivenSchedulingAgent.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/TimerDrivenSchedulingAgent.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/TimerDrivenSchedulingAgent.java
index f3eecbd..c4e6609 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/TimerDrivenSchedulingAgent.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/TimerDrivenSchedulingAgent.java
@@ -42,9 +42,10 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class TimerDrivenSchedulingAgent implements SchedulingAgent {
+
private static final Logger logger = LoggerFactory.getLogger(TimerDrivenSchedulingAgent.class);
private final long noWorkYieldNanos;
-
+
private final FlowController flowController;
private final FlowEngine flowEngine;
private final ProcessContextFactory contextFactory;
@@ -57,7 +58,7 @@ public class TimerDrivenSchedulingAgent implements SchedulingAgent {
this.flowEngine = flowEngine;
this.contextFactory = contextFactory;
this.encryptor = encryptor;
-
+
final String boredYieldDuration = NiFiProperties.getInstance().getBoredYieldDuration();
try {
noWorkYieldNanos = FormatUtils.getTimeDuration(boredYieldDuration, TimeUnit.NANOSECONDS);
@@ -84,31 +85,30 @@ public class TimerDrivenSchedulingAgent implements SchedulingAgent {
logger.info("{} started.", taskNode.getReportingTask());
}
-
@Override
public void schedule(final Connectable connectable, final ScheduleState scheduleState) {
-
+
final List<ScheduledFuture<?>> futures = new ArrayList<>();
for (int i = 0; i < connectable.getMaxConcurrentTasks(); i++) {
final Callable<Boolean> continuallyRunTask;
final ProcessContext processContext;
-
+
// Determine the task to run and create it.
if (connectable.getConnectableType() == ConnectableType.PROCESSOR) {
final ProcessorNode procNode = (ProcessorNode) connectable;
final StandardProcessContext standardProcContext = new StandardProcessContext(procNode, flowController, encryptor);
- final ContinuallyRunProcessorTask runnableTask = new ContinuallyRunProcessorTask(this, procNode, flowController,
+ final ContinuallyRunProcessorTask runnableTask = new ContinuallyRunProcessorTask(this, procNode, flowController,
contextFactory, scheduleState, standardProcContext);
-
+
continuallyRunTask = runnableTask;
processContext = standardProcContext;
} else {
processContext = new ConnectableProcessContext(connectable, encryptor);
continuallyRunTask = new ContinuallyRunConnectableTask(contextFactory, connectable, scheduleState, processContext);
}
-
+
final AtomicReference<ScheduledFuture<?>> futureRef = new AtomicReference<>();
-
+
final Runnable yieldDetectionRunnable = new Runnable() {
@Override
public void run() {
@@ -122,50 +122,50 @@ public class TimerDrivenSchedulingAgent implements SchedulingAgent {
} catch (final Exception e) {
throw new ProcessException(e);
}
-
+
// If the component is yielded, cancel its future and re-submit it to run again
// after the yield has expired.
final long newYieldExpiration = connectable.getYieldExpiration();
- if ( newYieldExpiration > System.currentTimeMillis() ) {
+ if (newYieldExpiration > System.currentTimeMillis()) {
final long yieldMillis = System.currentTimeMillis() - newYieldExpiration;
final ScheduledFuture<?> scheduledFuture = futureRef.get();
- if ( scheduledFuture == null ) {
+ if (scheduledFuture == null) {
return;
}
-
+
// If we are able to cancel the future, create a new one and update the ScheduleState so that it has
// an accurate accounting of which futures are outstanding; we must then also update the futureRef
// so that we can do this again the next time that the component is yielded.
if (scheduledFuture.cancel(false)) {
final long yieldNanos = TimeUnit.MILLISECONDS.toNanos(yieldMillis);
-
+
synchronized (scheduleState) {
- if ( scheduleState.isScheduled() ) {
- final ScheduledFuture<?> newFuture = flowEngine.scheduleWithFixedDelay(this, yieldNanos,
+ if (scheduleState.isScheduled()) {
+ final ScheduledFuture<?> newFuture = flowEngine.scheduleWithFixedDelay(this, yieldNanos,
connectable.getSchedulingPeriod(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);
-
+
scheduleState.replaceFuture(scheduledFuture, newFuture);
futureRef.set(newFuture);
}
}
}
- } else if ( noWorkYieldNanos > 0L && shouldYield ) {
+ } else if (noWorkYieldNanos > 0L && shouldYield) {
// Component itself didn't yield but there was no work to do, so the framework will choose
// to yield the component automatically for a short period of time.
final ScheduledFuture<?> scheduledFuture = futureRef.get();
- if ( scheduledFuture == null ) {
+ if (scheduledFuture == null) {
return;
}
-
+
// If we are able to cancel the future, create a new one and update the ScheduleState so that it has
// an accurate accounting of which futures are outstanding; we must then also update the futureRef
// so that we can do this again the next time that the component is yielded.
if (scheduledFuture.cancel(false)) {
synchronized (scheduleState) {
- if ( scheduleState.isScheduled() ) {
- final ScheduledFuture<?> newFuture = flowEngine.scheduleWithFixedDelay(this, noWorkYieldNanos,
+ if (scheduleState.isScheduled()) {
+ final ScheduledFuture<?> newFuture = flowEngine.scheduleWithFixedDelay(this, noWorkYieldNanos,
connectable.getSchedulingPeriod(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);
-
+
scheduleState.replaceFuture(scheduledFuture, newFuture);
futureRef.set(newFuture);
}
@@ -176,13 +176,13 @@ public class TimerDrivenSchedulingAgent implements SchedulingAgent {
};
// Schedule the task to run
- final ScheduledFuture<?> future = flowEngine.scheduleWithFixedDelay(yieldDetectionRunnable, 0L,
+ final ScheduledFuture<?> future = flowEngine.scheduleWithFixedDelay(yieldDetectionRunnable, 0L,
connectable.getSchedulingPeriod(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);
-
+
// now that we have the future, set the atomic reference so that if the component is yielded we
// are able to then cancel this future.
futureRef.set(future);
-
+
// Keep track of the futures so that we can update the ScheduleState.
futures.add(future);
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java
index 1fde670..92fa3b2 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java
@@ -43,15 +43,16 @@ import org.w3c.dom.Element;
import org.xml.sax.SAXException;
import org.xml.sax.SAXParseException;
-/**
- *
- */
public class ControllerServiceLoader {
private static final Logger logger = LoggerFactory.getLogger(ControllerServiceLoader.class);
-
- public static List<ControllerServiceNode> loadControllerServices(final ControllerServiceProvider provider, final InputStream serializedStream, final StringEncryptor encryptor, final BulletinRepository bulletinRepo, final boolean autoResumeState) throws IOException {
+ public static List<ControllerServiceNode> loadControllerServices(
+ final ControllerServiceProvider provider,
+ final InputStream serializedStream,
+ final StringEncryptor encryptor,
+ final BulletinRepository bulletinRepo,
+ final boolean autoResumeState) throws IOException {
final DocumentBuilderFactory documentBuilderFactory = DocumentBuilderFactory.newInstance();
documentBuilderFactory.setNamespaceAware(true);
@@ -87,66 +88,70 @@ public class ControllerServiceLoader {
throw err;
}
});
-
+
final Document document = builder.parse(in);
final Element controllerServices = document.getDocumentElement();
final List<Element> serviceElements = DomUtils.getChildElementsByTagName(controllerServices, "controllerService");
- return new ArrayList<ControllerServiceNode>(loadControllerServices(serviceElements, provider, encryptor, bulletinRepo, autoResumeState));
+ return new ArrayList<>(loadControllerServices(serviceElements, provider, encryptor, bulletinRepo, autoResumeState));
} catch (SAXException | ParserConfigurationException sxe) {
throw new IOException(sxe);
}
}
-
- public static Collection<ControllerServiceNode> loadControllerServices(final List<Element> serviceElements, final ControllerServiceProvider provider, final StringEncryptor encryptor, final BulletinRepository bulletinRepo, final boolean autoResumeState) {
+
+ public static Collection<ControllerServiceNode> loadControllerServices(
+ final List<Element> serviceElements,
+ final ControllerServiceProvider provider,
+ final StringEncryptor encryptor,
+ final BulletinRepository bulletinRepo,
+ final boolean autoResumeState) {
final Map<ControllerServiceNode, Element> nodeMap = new HashMap<>();
- for ( final Element serviceElement : serviceElements ) {
+ for (final Element serviceElement : serviceElements) {
final ControllerServiceNode serviceNode = createControllerService(provider, serviceElement, encryptor);
- // We need to clone the node because it will be used in a separate thread below, and
+ // We need to clone the node because it will be used in a separate thread below, and
// Element is not thread-safe.
nodeMap.put(serviceNode, (Element) serviceElement.cloneNode(true));
}
- for ( final Map.Entry<ControllerServiceNode, Element> entry : nodeMap.entrySet() ) {
+ for (final Map.Entry<ControllerServiceNode, Element> entry : nodeMap.entrySet()) {
configureControllerService(entry.getKey(), entry.getValue(), encryptor);
}
-
+
// Start services
- if ( autoResumeState ) {
+ if (autoResumeState) {
final Set<ControllerServiceNode> nodesToEnable = new HashSet<>();
-
- for ( final ControllerServiceNode node : nodeMap.keySet() ) {
+
+ for (final ControllerServiceNode node : nodeMap.keySet()) {
final Element controllerServiceElement = nodeMap.get(node);
final ControllerServiceDTO dto;
synchronized (controllerServiceElement.getOwnerDocument()) {
dto = FlowFromDOMFactory.getControllerService(controllerServiceElement, encryptor);
}
-
+
final ControllerServiceState state = ControllerServiceState.valueOf(dto.getState());
if (state == ControllerServiceState.ENABLED) {
nodesToEnable.add(node);
}
}
-
+
provider.enableControllerServices(nodesToEnable);
}
-
+
return nodeMap.keySet();
}
-
-
+
private static ControllerServiceNode createControllerService(final ControllerServiceProvider provider, final Element controllerServiceElement, final StringEncryptor encryptor) {
final ControllerServiceDTO dto = FlowFromDOMFactory.getControllerService(controllerServiceElement, encryptor);
-
+
final ControllerServiceNode node = provider.createControllerService(dto.getType(), dto.getId(), false);
node.setName(dto.getName());
node.setComments(dto.getComments());
return node;
}
-
+
private static void configureControllerService(final ControllerServiceNode node, final Element controllerServiceElement, final StringEncryptor encryptor) {
final ControllerServiceDTO dto = FlowFromDOMFactory.getControllerService(controllerServiceElement, encryptor);
node.setAnnotationData(dto.getAnnotationData());
-
+
for (final Map.Entry<String, String> entry : dto.getProperties().entrySet()) {
if (entry.getValue() == null) {
node.removeProperty(entry.getKey());
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceInitializationContext.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceInitializationContext.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceInitializationContext.java
index 8d46b05..02d6263 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceInitializationContext.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceInitializationContext.java
@@ -64,15 +64,15 @@ public class StandardControllerServiceInitializationContext implements Controlle
public boolean isControllerServiceEnabled(final ControllerService service) {
return serviceProvider.isControllerServiceEnabled(service);
}
-
+
@Override
public boolean isControllerServiceEnabling(String serviceIdentifier) {
return serviceProvider.isControllerServiceEnabling(serviceIdentifier);
}
-
+
@Override
public String getControllerServiceName(final String serviceIdentifier) {
- return serviceProvider.getControllerServiceName(serviceIdentifier);
+ return serviceProvider.getControllerServiceName(serviceIdentifier);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
index e768b9a..e577ffe 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
@@ -59,12 +59,11 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i
this.serviceProvider = serviceProvider;
}
-
@Override
public ControllerService getProxiedControllerService() {
return proxedControllerService;
}
-
+
@Override
public ControllerService getControllerServiceImplementation() {
return implementation;
@@ -106,23 +105,23 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i
throw new IllegalStateException("Cannot modify Controller Service configuration because it is currently enabled. Please disable the Controller Service first.");
}
}
-
+
@Override
public void setProperty(final String name, final String value) {
super.setProperty(name, value);
onConfigured();
}
-
+
@Override
public boolean removeProperty(String name) {
final boolean removed = super.removeProperty(name);
- if ( removed ) {
+ if (removed) {
onConfigured();
}
-
+
return removed;
}
-
+
@SuppressWarnings("deprecation")
private void onConfigured() {
try (final NarCloseable x = NarCloseable.withNarLoader()) {
@@ -132,97 +131,97 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i
throw new ComponentLifeCycleException("Failed to invoke On-Configured Lifecycle methods of " + implementation, e);
}
}
-
+
@Override
public void verifyCanDelete() {
- if ( getState() != ControllerServiceState.DISABLED ) {
+ if (getState() != ControllerServiceState.DISABLED) {
throw new IllegalStateException(implementation + " cannot be deleted because it is not disabled");
}
}
-
+
@Override
public void verifyCanDisable() {
verifyCanDisable(Collections.<ControllerServiceNode>emptySet());
}
-
+
@Override
public void verifyCanDisable(final Set<ControllerServiceNode> ignoreReferences) {
final ControllerServiceState state = getState();
- if ( state != ControllerServiceState.ENABLED && state != ControllerServiceState.ENABLING ) {
+ if (state != ControllerServiceState.ENABLED && state != ControllerServiceState.ENABLING) {
throw new IllegalStateException("Cannot disable " + getControllerServiceImplementation() + " because it is not enabled");
}
-
+
final ControllerServiceReference references = getReferences();
-
- for ( final ConfiguredComponent activeReference : references.getActiveReferences() ) {
- if ( !ignoreReferences.contains(activeReference) ) {
+
+ for (final ConfiguredComponent activeReference : references.getActiveReferences()) {
+ if (!ignoreReferences.contains(activeReference)) {
throw new IllegalStateException(implementation + " cannot be disabled because it is referenced by at least one component that is currently running");
}
}
}
-
+
@Override
public void verifyCanEnable() {
- if ( getState() != ControllerServiceState.DISABLED ) {
+ if (getState() != ControllerServiceState.DISABLED) {
throw new IllegalStateException(implementation + " cannot be enabled because it is not disabled");
}
-
- if ( !isValid() ) {
+
+ if (!isValid()) {
throw new IllegalStateException(implementation + " cannot be enabled because it is not valid: " + getValidationErrors());
}
}
-
+
@Override
public void verifyCanEnable(final Set<ControllerServiceNode> ignoredReferences) {
if (getState() != ControllerServiceState.DISABLED) {
throw new IllegalStateException(implementation + " cannot be enabled because it is not disabled");
}
-
+
final Set<String> ids = new HashSet<>();
- for ( final ControllerServiceNode node : ignoredReferences ) {
+ for (final ControllerServiceNode node : ignoredReferences) {
ids.add(node.getIdentifier());
}
-
+
final Collection<ValidationResult> validationResults = getValidationErrors(ids);
- for ( final ValidationResult result : validationResults ) {
- if ( !result.isValid() ) {
+ for (final ValidationResult result : validationResults) {
+ if (!result.isValid()) {
throw new IllegalStateException(implementation + " cannot be enabled because it is not valid: " + result);
}
}
}
-
+
@Override
public void verifyCanUpdate() {
- if ( getState() != ControllerServiceState.DISABLED ) {
+ if (getState() != ControllerServiceState.DISABLED) {
throw new IllegalStateException(implementation + " cannot be updated because it is not disabled");
}
}
-
+
@Override
public String getComments() {
- readLock.lock();
- try {
- return comment;
- } finally {
- readLock.unlock();
- }
+ readLock.lock();
+ try {
+ return comment;
+ } finally {
+ readLock.unlock();
+ }
}
-
+
@Override
public void setComments(final String comment) {
- writeLock.lock();
- try {
- this.comment = comment;
- } finally {
- writeLock.unlock();
- }
+ writeLock.lock();
+ try {
+ this.comment = comment;
+ } finally {
+ writeLock.unlock();
+ }
}
-
+
@Override
public ControllerServiceState getState() {
return stateRef.get();
}
-
+
@Override
public void setState(final ControllerServiceState state) {
this.stateRef.set(state);