You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2015/01/30 19:01:07 UTC
[4/7] incubator-nifi git commit: NIFI-250: Updated controller
services to use appropriate defaults and use .identifiesControllerService
instead of using the old way of obtaining controller services;
do not fail to startup if controller service is invalid
NIFI-250: Updated controller services to use appropriate defaults and use .identifiesControllerService instead of using the old way of obtaining controller services; do not fail to startup if controller service is invalid
Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/3344cef3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/3344cef3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/3344cef3
Branch: refs/heads/NIFI-250
Commit: 3344cef3365dfc620f11aaf18fcfc8ba7d58e16a
Parents: 1682e47
Author: Mark Payne <ma...@hotmail.com>
Authored: Fri Jan 30 09:05:58 2015 -0500
Committer: Mark Payne <ma...@hotmail.com>
Committed: Fri Jan 30 09:05:58 2015 -0500
----------------------------------------------------------------------
.../controller/StandardFlowSynchronizer.java | 48 ++++++++++++++++++--
.../nifi/processors/standard/PostHTTP.java | 38 ++++++++++------
.../DistributedMapCacheClientService.java | 18 +++-----
.../DistributedSetCacheClientService.java | 11 ++---
.../cache/server/DistributedCacheServer.java | 9 ++--
.../nifi/ssl/StandardSSLContextService.java | 2 -
6 files changed, 82 insertions(+), 44 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3344cef3/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
index 33650e1..0619793 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
@@ -100,9 +100,11 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
private static final Logger logger = LoggerFactory.getLogger(StandardFlowSynchronizer.class);
public static final URL FLOW_XSD_RESOURCE = StandardFlowSynchronizer.class.getResource("/FlowConfiguration.xsd");
private final StringEncryptor encryptor;
+ private final boolean autoResumeState;
public StandardFlowSynchronizer(final StringEncryptor encryptor) {
this.encryptor = encryptor;
+ autoResumeState = NiFiProperties.getInstance().getAutoResumeState();
}
public static boolean isEmpty(final DataFlow dataFlow, final StringEncryptor encryptor) {
@@ -349,7 +351,6 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
node.setName(dto.getName());
node.setAvailability(Availability.valueOf(dto.getAvailability()));
node.setComments(dto.getComments());
- node.setDisabled(dto.getEnabled() != Boolean.TRUE);
node.setAnnotationData(dto.getAnnotationData());
for (final Map.Entry<String, String> entry : dto.getProperties().entrySet()) {
@@ -360,9 +361,21 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
}
}
- if ( dto.getEnabled() == Boolean.TRUE ) {
- controller.enableControllerService(node);
- }
+ if ( autoResumeState ) {
+ if ( dto.getEnabled() == Boolean.TRUE ) {
+ try {
+ controller.enableControllerService(node);
+ } catch (final Exception e) {
+ logger.error("Failed to enable " + node + " due to " + e);
+ if ( logger.isDebugEnabled() ) {
+ logger.error("", e);
+ }
+
+ controller.getBulletinRepository().addBulletin(BulletinFactory.createBulletin(
+ "Controller Service", Severity.ERROR.name(), "Could not start " + node + " due to " + e));
+ }
+ }
+ }
}
private void updateControllerService(final FlowController controller, final Element controllerServiceElement, final StringEncryptor encryptor) {
@@ -384,7 +397,6 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
reportingTask.setComments(dto.getComment());
reportingTask.setAvailability(Availability.valueOf(dto.getAvailability()));
reportingTask.setScheduldingPeriod(dto.getSchedulingPeriod());
- reportingTask.setScheduledState(ScheduledState.valueOf(dto.getScheduledState()));
reportingTask.setSchedulingStrategy(SchedulingStrategy.valueOf(dto.getSchedulingStrategy()));
reportingTask.setAnnotationData(dto.getAnnotationData());
@@ -396,6 +408,32 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
reportingTask.setProperty(entry.getKey(), entry.getValue());
}
}
+
+ if ( autoResumeState ) {
+ if ( ScheduledState.RUNNING.name().equals(dto.getScheduledState()) ) {
+ try {
+ controller.startReportingTask(reportingTask);
+ } catch (final Exception e) {
+ logger.error("Failed to start {} due to {}", reportingTask, e);
+ if ( logger.isDebugEnabled() ) {
+ logger.error("", e);
+ }
+ controller.getBulletinRepository().addBulletin(BulletinFactory.createBulletin(
+ "Reporting Tasks", Severity.ERROR.name(), "Failed to start " + reportingTask + " due to " + e));
+ }
+ } else if ( ScheduledState.DISABLED.name().equals(dto.getScheduledState()) ) {
+ try {
+ controller.disableReportingTask(reportingTask);
+ } catch (final Exception e) {
+ logger.error("Failed to mark {} as disabled due to {}", reportingTask, e);
+ if ( logger.isDebugEnabled() ) {
+ logger.error("", e);
+ }
+ controller.getBulletinRepository().addBulletin(BulletinFactory.createBulletin(
+ "Reporting Tasks", Severity.ERROR.name(), "Failed to mark " + reportingTask + " as disabled due to " + e));
+ }
+ }
+ }
}
private void updateReportingTask(final FlowController controller, final Element reportingTaskElement, final StringEncryptor encryptor) {
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3344cef3/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PostHTTP.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PostHTTP.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PostHTTP.java
index fd486b0..ff7475b 100644
--- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PostHTTP.java
+++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PostHTTP.java
@@ -68,6 +68,7 @@ import org.apache.http.conn.HttpClientConnectionManager;
import org.apache.http.conn.ManagedHttpClientConnection;
import org.apache.http.conn.socket.ConnectionSocketFactory;
import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
+import org.apache.http.conn.ssl.SSLContextBuilder;
import org.apache.http.conn.ssl.SSLContexts;
import org.apache.http.conn.ssl.TrustSelfSignedStrategy;
import org.apache.http.entity.ContentProducer;
@@ -352,22 +353,29 @@ public class PostHTTP extends AbstractProcessor {
private SSLContext createSSLContext(final SSLContextService service) throws KeyStoreException, IOException, NoSuchAlgorithmException,
CertificateException, KeyManagementException, UnrecoverableKeyException
{
- final KeyStore truststore = KeyStore.getInstance(service.getTrustStoreType());
- try (final InputStream in = new FileInputStream(new File(service.getTrustStoreFile()))) {
- truststore.load(in, service.getTrustStorePassword().toCharArray());
- }
-
- final KeyStore keystore = KeyStore.getInstance(service.getKeyStoreType());
- try (final InputStream in = new FileInputStream(new File(service.getKeyStoreFile()))) {
- keystore.load(in, service.getKeyStorePassword().toCharArray());
- }
-
- SSLContext sslContext = SSLContexts.custom()
- .loadTrustMaterial(truststore, new TrustSelfSignedStrategy())
- .loadKeyMaterial(keystore, service.getKeyStorePassword().toCharArray())
- .build();
+ final SSLContextBuilder builder = SSLContexts.custom();
+
+ final String trustStoreFilename = service.getTrustStoreFile();
+ if ( trustStoreFilename != null ) {
+ final KeyStore truststore = KeyStore.getInstance(service.getTrustStoreType());
+ try (final InputStream in = new FileInputStream(new File(trustStoreFilename))) {
+ truststore.load(in, service.getTrustStorePassword().toCharArray());
+ }
+
+ builder.loadTrustMaterial(truststore, new TrustSelfSignedStrategy());
+ }
- return sslContext;
+ final String keyStoreFilename = service.getKeyStoreFile();
+ if ( keyStoreFilename != null ) {
+ final KeyStore keystore = KeyStore.getInstance(service.getKeyStoreType());
+ try (final InputStream in = new FileInputStream(new File(service.getKeyStoreFile()))) {
+ keystore.load(in, service.getKeyStorePassword().toCharArray());
+ }
+
+ builder.loadKeyMaterial(keystore, service.getKeyStorePassword().toCharArray());
+ }
+
+ return builder.build();
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3344cef3/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java
index 6dad80b..956ed16 100644
--- a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java
+++ b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java
@@ -59,16 +59,15 @@ public class DistributedMapCacheClientService extends AbstractControllerService
.build();
public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
.name("SSL Context Service")
- .description(
- "If specified, indicates the SSL Context Service that is used to communicate with the remote server. If not specified, communications will not be encrypted")
+ .description("If specified, indicates the SSL Context Service that is used to communicate with the "
+ + "remote server. If not specified, communications will not be encrypted")
.required(false)
- .addValidator(StandardValidators.createControllerServiceExistsValidator(SSLContextService.class))
- .defaultValue(null)
+ .identifiesControllerService(SSLContextService.class)
.build();
public static final PropertyDescriptor COMMUNICATIONS_TIMEOUT = new PropertyDescriptor.Builder()
.name("Communications Timeout")
- .description(
- "Specifies how long to wait when communicating with the remote server before determining that there is a communications failure if data cannot be sent or received")
+ .description("Specifies how long to wait when communicating with the remote server before determining that "
+ + "there is a communications failure if data cannot be sent or received")
.required(true)
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.defaultValue("30 secs")
@@ -94,8 +93,7 @@ public class DistributedMapCacheClientService extends AbstractControllerService
}
@Override
- public <K, V> boolean putIfAbsent(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer)
- throws IOException {
+ public <K, V> boolean putIfAbsent(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer) throws IOException {
return withCommsSession(new CommsAction<Boolean>() {
@Override
public Boolean execute(final CommsSession session) throws IOException {
@@ -131,8 +129,7 @@ public class DistributedMapCacheClientService extends AbstractControllerService
}
@Override
- public <K, V> V getAndPutIfAbsent(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer,
- final Deserializer<V> valueDeserializer) throws IOException {
+ public <K, V> V getAndPutIfAbsent(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer, final Deserializer<V> valueDeserializer) throws IOException {
return withCommsSession(new CommsAction<V>() {
@Override
public V execute(final CommsSession session) throws IOException {
@@ -297,7 +294,6 @@ public class DistributedMapCacheClientService extends AbstractControllerService
}
private static interface CommsAction<T> {
-
T execute(CommsSession commsSession) throws IOException;
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3344cef3/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedSetCacheClientService.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedSetCacheClientService.java b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedSetCacheClientService.java
index dcc5558..67a2d43 100644
--- a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedSetCacheClientService.java
+++ b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedSetCacheClientService.java
@@ -59,16 +59,15 @@ public class DistributedSetCacheClientService extends AbstractControllerService
.build();
public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
.name("SSL Context Service")
- .description(
- "If specified, indicates the SSL Context Service that is used to communicate with the remote server. If not specified, communications will not be encrypted")
+ .description("If specified, indicates the SSL Context Service that is used to communicate with the "
+ + "remote server. If not specified, communications will not be encrypted")
.required(false)
- .addValidator(StandardValidators.createControllerServiceExistsValidator(SSLContextService.class))
- .defaultValue(null)
+ .identifiesControllerService(SSLContextService.class)
.build();
public static final PropertyDescriptor COMMUNICATIONS_TIMEOUT = new PropertyDescriptor.Builder()
.name("Communications Timeout")
- .description(
- "Specifices how long to wait when communicating with the remote server before determining that there is a communications failure if data cannot be sent or received")
+ .description("Specifices how long to wait when communicating with the remote server before determining "
+ + "that there is a communications failure if data cannot be sent or received")
.required(true)
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.defaultValue("30 secs")
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3344cef3/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/DistributedCacheServer.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/DistributedCacheServer.java b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/DistributedCacheServer.java
index 0817479..66b73e2 100644
--- a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/DistributedCacheServer.java
+++ b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/DistributedCacheServer.java
@@ -42,10 +42,10 @@ public abstract class DistributedCacheServer extends AbstractControllerService {
.build();
public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
.name("SSL Context Service")
- .description(
- "If specified, this service will be used to create an SSL Context that will be used to secure communications; if not specified, communications will not be secure")
+ .description("If specified, this service will be used to create an SSL Context that will be used "
+ + "to secure communications; if not specified, communications will not be secure")
.required(false)
- .addValidator(StandardValidators.createControllerServiceExistsValidator(SSLContextService.class))
+ .identifiesControllerService(SSLContextService.class)
.build();
public static final PropertyDescriptor MAX_CACHE_ENTRIES = new PropertyDescriptor.Builder()
.name("Maximum Cache Entries")
@@ -77,8 +77,7 @@ public abstract class DistributedCacheServer extends AbstractControllerService {
properties.add(MAX_CACHE_ENTRIES);
properties.add(EVICTION_POLICY);
properties.add(PERSISTENCE_PATH);
- properties.add(new PropertyDescriptor.Builder().fromPropertyDescriptor(SSL_CONTEXT_SERVICE).allowableValues(
- getControllerServiceLookup().getControllerServiceIdentifiers(SSLContextService.class)).build());
+ properties.add(SSL_CONTEXT_SERVICE);
return properties;
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3344cef3/nifi/nifi-nar-bundles/nifi-standard-services/nifi-ssl-context-bundle/nifi-ssl-context-service/src/main/java/org/apache/nifi/ssl/StandardSSLContextService.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-ssl-context-bundle/nifi-ssl-context-service/src/main/java/org/apache/nifi/ssl/StandardSSLContextService.java b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-ssl-context-bundle/nifi-ssl-context-service/src/main/java/org/apache/nifi/ssl/StandardSSLContextService.java
index ae2e19c..39bb5fb 100644
--- a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-ssl-context-bundle/nifi-ssl-context-service/src/main/java/org/apache/nifi/ssl/StandardSSLContextService.java
+++ b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-ssl-context-bundle/nifi-ssl-context-service/src/main/java/org/apache/nifi/ssl/StandardSSLContextService.java
@@ -62,7 +62,6 @@ public class StandardSSLContextService extends AbstractControllerService impleme
.description("The Type of the Truststore. Either JKS or PKCS12")
.allowableValues(STORE_TYPE_JKS, STORE_TYPE_PKCS12)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .defaultValue(STORE_TYPE_JKS)
.sensitive(false)
.build();
public static final PropertyDescriptor TRUSTSTORE_PASSWORD = new PropertyDescriptor.Builder()
@@ -84,7 +83,6 @@ public class StandardSSLContextService extends AbstractControllerService impleme
.description("The Type of the Keystore")
.allowableValues(STORE_TYPE_JKS, STORE_TYPE_PKCS12)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .defaultValue(STORE_TYPE_JKS)
.sensitive(false)
.build();
public static final PropertyDescriptor KEYSTORE_PASSWORD = new PropertyDescriptor.Builder()