You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2015/01/30 19:01:07 UTC

[4/7] incubator-nifi git commit: NIFI-250: Updated controller services to use appropriate defaults and use .identifiesControllerService instead of using the old way of obtaining controller services; do not fail to startup if controller service is invalid

NIFI-250: Updated controller services to use appropriate defaults and use .identifiesControllerService instead of using the old way of obtaining controller services; do not fail to startup if controller service is invalid


Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/3344cef3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/3344cef3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/3344cef3

Branch: refs/heads/NIFI-250
Commit: 3344cef3365dfc620f11aaf18fcfc8ba7d58e16a
Parents: 1682e47
Author: Mark Payne <ma...@hotmail.com>
Authored: Fri Jan 30 09:05:58 2015 -0500
Committer: Mark Payne <ma...@hotmail.com>
Committed: Fri Jan 30 09:05:58 2015 -0500

----------------------------------------------------------------------
 .../controller/StandardFlowSynchronizer.java    | 48 ++++++++++++++++++--
 .../nifi/processors/standard/PostHTTP.java      | 38 ++++++++++------
 .../DistributedMapCacheClientService.java       | 18 +++-----
 .../DistributedSetCacheClientService.java       | 11 ++---
 .../cache/server/DistributedCacheServer.java    |  9 ++--
 .../nifi/ssl/StandardSSLContextService.java     |  2 -
 6 files changed, 82 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3344cef3/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
index 33650e1..0619793 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
@@ -100,9 +100,11 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
     private static final Logger logger = LoggerFactory.getLogger(StandardFlowSynchronizer.class);
     public static final URL FLOW_XSD_RESOURCE = StandardFlowSynchronizer.class.getResource("/FlowConfiguration.xsd");
     private final StringEncryptor encryptor;
+    private final boolean autoResumeState;
 
     public StandardFlowSynchronizer(final StringEncryptor encryptor) {
         this.encryptor = encryptor;
+        autoResumeState = NiFiProperties.getInstance().getAutoResumeState();
     }
 
     public static boolean isEmpty(final DataFlow dataFlow, final StringEncryptor encryptor) {
@@ -349,7 +351,6 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
     	node.setName(dto.getName());
     	node.setAvailability(Availability.valueOf(dto.getAvailability()));
     	node.setComments(dto.getComments());
-    	node.setDisabled(dto.getEnabled() != Boolean.TRUE);
     	node.setAnnotationData(dto.getAnnotationData());
     	
         for (final Map.Entry<String, String> entry : dto.getProperties().entrySet()) {
@@ -360,9 +361,21 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
             }
         }
 
-    	if ( dto.getEnabled() == Boolean.TRUE ) {
-    		controller.enableControllerService(node);
-    	}
+        if ( autoResumeState ) {
+	    	if ( dto.getEnabled() == Boolean.TRUE ) {
+	    		try {
+	    			controller.enableControllerService(node);
+	    		} catch (final Exception e) {
+	    			logger.error("Failed to enable " + node + " due to " + e);
+	    			if ( logger.isDebugEnabled() ) {
+	    				logger.error("", e);
+	    			}
+	    			
+	    			controller.getBulletinRepository().addBulletin(BulletinFactory.createBulletin(
+	    					"Controller Service", Severity.ERROR.name(), "Could not start " + node + " due to " + e));
+	    		}
+	    	}
+        }
     }
     
     private void updateControllerService(final FlowController controller, final Element controllerServiceElement, final StringEncryptor encryptor) {
@@ -384,7 +397,6 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
     	reportingTask.setComments(dto.getComment());
     	reportingTask.setAvailability(Availability.valueOf(dto.getAvailability()));
     	reportingTask.setScheduldingPeriod(dto.getSchedulingPeriod());
-    	reportingTask.setScheduledState(ScheduledState.valueOf(dto.getScheduledState()));
     	reportingTask.setSchedulingStrategy(SchedulingStrategy.valueOf(dto.getSchedulingStrategy()));
     	
     	reportingTask.setAnnotationData(dto.getAnnotationData());
@@ -396,6 +408,32 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
             	reportingTask.setProperty(entry.getKey(), entry.getValue());
             }
         }
+        
+        if ( autoResumeState ) {
+	        if ( ScheduledState.RUNNING.name().equals(dto.getScheduledState()) ) {
+	        	try {
+	        		controller.startReportingTask(reportingTask);
+	        	} catch (final Exception e) {
+	        		logger.error("Failed to start {} due to {}", reportingTask, e);
+	        		if ( logger.isDebugEnabled() ) {
+	        			logger.error("", e);
+	        		}
+	        		controller.getBulletinRepository().addBulletin(BulletinFactory.createBulletin(
+	        				"Reporting Tasks", Severity.ERROR.name(), "Failed to start " + reportingTask + " due to " + e));
+	        	}
+	        } else if ( ScheduledState.DISABLED.name().equals(dto.getScheduledState()) ) {
+	        	try {
+	        		controller.disableReportingTask(reportingTask);
+	        	} catch (final Exception e) {
+	        		logger.error("Failed to mark {} as disabled due to {}", reportingTask, e);
+	        		if ( logger.isDebugEnabled() ) {
+	        			logger.error("", e);
+	        		}
+	        		controller.getBulletinRepository().addBulletin(BulletinFactory.createBulletin(
+	        				"Reporting Tasks", Severity.ERROR.name(), "Failed to mark " + reportingTask + " as disabled due to " + e));
+	        	}
+	        }
+        }
     }
 
     private void updateReportingTask(final FlowController controller, final Element reportingTaskElement, final StringEncryptor encryptor) {

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3344cef3/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PostHTTP.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PostHTTP.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PostHTTP.java
index fd486b0..ff7475b 100644
--- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PostHTTP.java
+++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PostHTTP.java
@@ -68,6 +68,7 @@ import org.apache.http.conn.HttpClientConnectionManager;
 import org.apache.http.conn.ManagedHttpClientConnection;
 import org.apache.http.conn.socket.ConnectionSocketFactory;
 import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
+import org.apache.http.conn.ssl.SSLContextBuilder;
 import org.apache.http.conn.ssl.SSLContexts;
 import org.apache.http.conn.ssl.TrustSelfSignedStrategy;
 import org.apache.http.entity.ContentProducer;
@@ -352,22 +353,29 @@ public class PostHTTP extends AbstractProcessor {
     private SSLContext createSSLContext(final SSLContextService service) throws KeyStoreException, IOException, NoSuchAlgorithmException, 
         CertificateException, KeyManagementException, UnrecoverableKeyException 
     {
-        final KeyStore truststore  = KeyStore.getInstance(service.getTrustStoreType());
-        try (final InputStream in = new FileInputStream(new File(service.getTrustStoreFile()))) {
-            truststore.load(in, service.getTrustStorePassword().toCharArray());
-        }
-        
-        final KeyStore keystore  = KeyStore.getInstance(service.getKeyStoreType());
-        try (final InputStream in = new FileInputStream(new File(service.getKeyStoreFile()))) {
-            keystore.load(in, service.getKeyStorePassword().toCharArray());
-        }
-        
-        SSLContext sslContext = SSLContexts.custom()
-                .loadTrustMaterial(truststore, new TrustSelfSignedStrategy())
-                .loadKeyMaterial(keystore, service.getKeyStorePassword().toCharArray())
-                .build();
+    	final SSLContextBuilder builder = SSLContexts.custom();
+    	
+    	final String trustStoreFilename = service.getTrustStoreFile();
+    	if ( trustStoreFilename != null ) {
+	        final KeyStore truststore  = KeyStore.getInstance(service.getTrustStoreType());
+	        try (final InputStream in = new FileInputStream(new File(trustStoreFilename))) {
+	            truststore.load(in, service.getTrustStorePassword().toCharArray());
+	        }
+	        
+	        builder.loadTrustMaterial(truststore, new TrustSelfSignedStrategy());
+    	}
         
-        return sslContext;
+    	final String keyStoreFilename = service.getKeyStoreFile();
+    	if ( keyStoreFilename != null ) {
+	        final KeyStore keystore  = KeyStore.getInstance(service.getKeyStoreType());
+	        try (final InputStream in = new FileInputStream(new File(service.getKeyStoreFile()))) {
+	            keystore.load(in, service.getKeyStorePassword().toCharArray());
+	        }
+	        
+	        builder.loadKeyMaterial(keystore, service.getKeyStorePassword().toCharArray());
+    	}
+    	
+        return builder.build();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3344cef3/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java
index 6dad80b..956ed16 100644
--- a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java
+++ b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java
@@ -59,16 +59,15 @@ public class DistributedMapCacheClientService extends AbstractControllerService
             .build();
     public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
             .name("SSL Context Service")
-            .description(
-                    "If specified, indicates the SSL Context Service that is used to communicate with the remote server. If not specified, communications will not be encrypted")
+            .description("If specified, indicates the SSL Context Service that is used to communicate with the "
+            		+ "remote server. If not specified, communications will not be encrypted")
             .required(false)
-            .addValidator(StandardValidators.createControllerServiceExistsValidator(SSLContextService.class))
-            .defaultValue(null)
+            .identifiesControllerService(SSLContextService.class)
             .build();
     public static final PropertyDescriptor COMMUNICATIONS_TIMEOUT = new PropertyDescriptor.Builder()
             .name("Communications Timeout")
-            .description(
-                    "Specifies how long to wait when communicating with the remote server before determining that there is a communications failure if data cannot be sent or received")
+            .description("Specifies how long to wait when communicating with the remote server before determining that "
+            		+ "there is a communications failure if data cannot be sent or received")
             .required(true)
             .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
             .defaultValue("30 secs")
@@ -94,8 +93,7 @@ public class DistributedMapCacheClientService extends AbstractControllerService
     }
 
     @Override
-    public <K, V> boolean putIfAbsent(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer)
-            throws IOException {
+    public <K, V> boolean putIfAbsent(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer) throws IOException {
         return withCommsSession(new CommsAction<Boolean>() {
             @Override
             public Boolean execute(final CommsSession session) throws IOException {
@@ -131,8 +129,7 @@ public class DistributedMapCacheClientService extends AbstractControllerService
     }
 
     @Override
-    public <K, V> V getAndPutIfAbsent(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer,
-            final Deserializer<V> valueDeserializer) throws IOException {
+    public <K, V> V getAndPutIfAbsent(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer, final Deserializer<V> valueDeserializer) throws IOException {
         return withCommsSession(new CommsAction<V>() {
             @Override
             public V execute(final CommsSession session) throws IOException {
@@ -297,7 +294,6 @@ public class DistributedMapCacheClientService extends AbstractControllerService
     }
 
     private static interface CommsAction<T> {
-
         T execute(CommsSession commsSession) throws IOException;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3344cef3/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedSetCacheClientService.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedSetCacheClientService.java b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedSetCacheClientService.java
index dcc5558..67a2d43 100644
--- a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedSetCacheClientService.java
+++ b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedSetCacheClientService.java
@@ -59,16 +59,15 @@ public class DistributedSetCacheClientService extends AbstractControllerService
             .build();
     public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
             .name("SSL Context Service")
-            .description(
-                    "If specified, indicates the SSL Context Service that is used to communicate with the remote server. If not specified, communications will not be encrypted")
+            .description("If specified, indicates the SSL Context Service that is used to communicate with the "
+            		+ "remote server. If not specified, communications will not be encrypted")
             .required(false)
-            .addValidator(StandardValidators.createControllerServiceExistsValidator(SSLContextService.class))
-            .defaultValue(null)
+            .identifiesControllerService(SSLContextService.class)
             .build();
     public static final PropertyDescriptor COMMUNICATIONS_TIMEOUT = new PropertyDescriptor.Builder()
             .name("Communications Timeout")
-            .description(
-                    "Specifices how long to wait when communicating with the remote server before determining that there is a communications failure if data cannot be sent or received")
+            .description("Specifices how long to wait when communicating with the remote server before determining "
+            		+ "that there is a communications failure if data cannot be sent or received")
             .required(true)
             .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
             .defaultValue("30 secs")

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3344cef3/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/DistributedCacheServer.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/DistributedCacheServer.java b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/DistributedCacheServer.java
index 0817479..66b73e2 100644
--- a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/DistributedCacheServer.java
+++ b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/DistributedCacheServer.java
@@ -42,10 +42,10 @@ public abstract class DistributedCacheServer extends AbstractControllerService {
             .build();
     public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
             .name("SSL Context Service")
-            .description(
-                    "If specified, this service will be used to create an SSL Context that will be used to secure communications; if not specified, communications will not be secure")
+            .description("If specified, this service will be used to create an SSL Context that will be used "
+            		+ "to secure communications; if not specified, communications will not be secure")
             .required(false)
-            .addValidator(StandardValidators.createControllerServiceExistsValidator(SSLContextService.class))
+            .identifiesControllerService(SSLContextService.class)
             .build();
     public static final PropertyDescriptor MAX_CACHE_ENTRIES = new PropertyDescriptor.Builder()
             .name("Maximum Cache Entries")
@@ -77,8 +77,7 @@ public abstract class DistributedCacheServer extends AbstractControllerService {
         properties.add(MAX_CACHE_ENTRIES);
         properties.add(EVICTION_POLICY);
         properties.add(PERSISTENCE_PATH);
-        properties.add(new PropertyDescriptor.Builder().fromPropertyDescriptor(SSL_CONTEXT_SERVICE).allowableValues(
-                getControllerServiceLookup().getControllerServiceIdentifiers(SSLContextService.class)).build());
+        properties.add(SSL_CONTEXT_SERVICE);
         return properties;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3344cef3/nifi/nifi-nar-bundles/nifi-standard-services/nifi-ssl-context-bundle/nifi-ssl-context-service/src/main/java/org/apache/nifi/ssl/StandardSSLContextService.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-ssl-context-bundle/nifi-ssl-context-service/src/main/java/org/apache/nifi/ssl/StandardSSLContextService.java b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-ssl-context-bundle/nifi-ssl-context-service/src/main/java/org/apache/nifi/ssl/StandardSSLContextService.java
index ae2e19c..39bb5fb 100644
--- a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-ssl-context-bundle/nifi-ssl-context-service/src/main/java/org/apache/nifi/ssl/StandardSSLContextService.java
+++ b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-ssl-context-bundle/nifi-ssl-context-service/src/main/java/org/apache/nifi/ssl/StandardSSLContextService.java
@@ -62,7 +62,6 @@ public class StandardSSLContextService extends AbstractControllerService impleme
             .description("The Type of the Truststore. Either JKS or PKCS12")
             .allowableValues(STORE_TYPE_JKS, STORE_TYPE_PKCS12)
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-            .defaultValue(STORE_TYPE_JKS)
             .sensitive(false)
             .build();
     public static final PropertyDescriptor TRUSTSTORE_PASSWORD = new PropertyDescriptor.Builder()
@@ -84,7 +83,6 @@ public class StandardSSLContextService extends AbstractControllerService impleme
             .description("The Type of the Keystore")
             .allowableValues(STORE_TYPE_JKS, STORE_TYPE_PKCS12)
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-            .defaultValue(STORE_TYPE_JKS)
             .sensitive(false)
             .build();
     public static final PropertyDescriptor KEYSTORE_PASSWORD = new PropertyDescriptor.Builder()