You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2015/04/29 13:53:39 UTC
[1/3] incubator-nifi git commit: NIFI-526 addressed the two
checkstyle findings. Curious how they got in there
Repository: incubator-nifi
Updated Branches:
refs/heads/ListHDFS 00b686b0b -> 591b14307
NIFI-526 addressed the two checkstyle findings. Curious how they got in there
Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/57e78bf7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/57e78bf7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/57e78bf7
Branch: refs/heads/ListHDFS
Commit: 57e78bf723c0f1ddb642475e0b16cdf52c8ee1fd
Parents: 8c00623
Author: joewitt <jo...@apache.org>
Authored: Tue Apr 28 15:31:27 2015 -0400
Committer: joewitt <jo...@apache.org>
Committed: Tue Apr 28 15:31:27 2015 -0400
----------------------------------------------------------------------
.../main/java/org/apache/nifi/processors/standard/GetHTTP.java | 5 ++---
1 file changed, 2 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/57e78bf7/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetHTTP.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetHTTP.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetHTTP.java
index a1f57da..db32e30 100644
--- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetHTTP.java
+++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetHTTP.java
@@ -435,7 +435,7 @@ public class GetHTTP extends AbstractSessionFactoryProcessor {
readLock.unlock();
writeLock.lock();
try {
- if (timeToPersist < System.currentTimeMillis()) {
+ if (timeToPersist < System.currentTimeMillis()) {
timeToPersist = System.currentTimeMillis() + PERSISTENCE_INTERVAL_MSEC;
File httpCache = new File(HTTP_CACHE_FILE_PREFIX + getIdentifier());
try (FileOutputStream fos = new FileOutputStream(httpCache)) {
@@ -447,8 +447,7 @@ public class GetHTTP extends AbstractSessionFactoryProcessor {
getLogger().error("Failed to persist ETag and LastMod due to " + e, e);
}
}
- }
- finally {
+ } finally {
readLock.lock();
writeLock.unlock();
}
[3/3] incubator-nifi git commit: Merge branch 'develop' into ListHDFS
Posted by ma...@apache.org.
Merge branch 'develop' into ListHDFS
Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/591b1430
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/591b1430
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/591b1430
Branch: refs/heads/ListHDFS
Commit: 591b14307cfb061dd2ecc9f57ba0e39a387ebabe
Parents: ebee094 57e78bf
Author: Mark Payne <ma...@hotmail.com>
Authored: Wed Apr 29 07:45:02 2015 -0400
Committer: Mark Payne <ma...@hotmail.com>
Committed: Wed Apr 29 07:45:02 2015 -0400
----------------------------------------------------------------------
----------------------------------------------------------------------
[2/3] incubator-nifi git commit: NIFI-533: Fixed checkstyle issues
Posted by ma...@apache.org.
NIFI-533: Fixed checkstyle issues
Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/ebee094f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/ebee094f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/ebee094f
Branch: refs/heads/ListHDFS
Commit: ebee094ff3ef02c0daea5247989b0d9d6a7337d4
Parents: 00b686b
Author: Mark Payne <ma...@hotmail.com>
Authored: Wed Apr 29 07:44:19 2015 -0400
Committer: Mark Payne <ma...@hotmail.com>
Committed: Wed Apr 29 07:44:19 2015 -0400
----------------------------------------------------------------------
.../notification/OnPrimaryNodeStateChange.java | 4 +-
.../notification/PrimaryNodeState.java | 20 ++---
.../apache/nifi/controller/FlowController.java | 28 +++---
.../nifi/processors/standard/GetHTTP.java | 31 ++++---
.../standard/TestDetectDuplicate.java | 32 +++----
.../DistributedMapCacheClientService.java | 91 ++++++++++----------
.../cache/server/map/PersistentMapCache.java | 19 ++--
7 files changed, 112 insertions(+), 113 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/ebee094f/nifi/nifi-api/src/main/java/org/apache/nifi/annotation/notification/OnPrimaryNodeStateChange.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-api/src/main/java/org/apache/nifi/annotation/notification/OnPrimaryNodeStateChange.java b/nifi/nifi-api/src/main/java/org/apache/nifi/annotation/notification/OnPrimaryNodeStateChange.java
index e073660..4ea2170 100644
--- a/nifi/nifi-api/src/main/java/org/apache/nifi/annotation/notification/OnPrimaryNodeStateChange.java
+++ b/nifi/nifi-api/src/main/java/org/apache/nifi/annotation/notification/OnPrimaryNodeStateChange.java
@@ -25,10 +25,10 @@ import java.lang.annotation.Target;
/**
* <p>
- * Marker annotation that a component can use to indicate that a method should be
+ * Marker annotation that a component can use to indicate that a method should be
* called whenever the state of the Primary Node in a cluster has changed.
* </p>
- *
+ *
* <p>
* Methods with this annotation should take either no arguments or one argument of type
* {@link PrimaryNodeState}. The {@link PrimaryNodeState} provides context about what changed
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/ebee094f/nifi/nifi-api/src/main/java/org/apache/nifi/annotation/notification/PrimaryNodeState.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-api/src/main/java/org/apache/nifi/annotation/notification/PrimaryNodeState.java b/nifi/nifi-api/src/main/java/org/apache/nifi/annotation/notification/PrimaryNodeState.java
index 3a7245c..0d65b65 100644
--- a/nifi/nifi-api/src/main/java/org/apache/nifi/annotation/notification/PrimaryNodeState.java
+++ b/nifi/nifi-api/src/main/java/org/apache/nifi/annotation/notification/PrimaryNodeState.java
@@ -20,14 +20,14 @@ package org.apache.nifi.annotation.notification;
* Represents a state change that occurred for the Primary Node of a NiFi cluster.
*/
public enum PrimaryNodeState {
- /**
- * The node receiving this state has been elected the Primary Node of the NiFi cluster.
- */
- ELECTED_PRIMARY_NODE,
-
- /**
- * The node receiving this state was the Primary Node but has now had its Primary Node
- * role revoked.
- */
- PRIMARY_NODE_REVOKED;
+ /**
+ * The node receiving this state has been elected the Primary Node of the NiFi cluster.
+ */
+ ELECTED_PRIMARY_NODE,
+
+ /**
+ * The node receiving this state was the Primary Node but has now had its Primary Node
+ * role revoked.
+ */
+ PRIMARY_NODE_REVOKED;
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/ebee094f/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index e9eeaf2..6c655cb 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -296,7 +296,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
*/
private final AtomicReference<HeartbeatMessageGeneratorTask> heartbeatMessageGeneratorTaskRef = new AtomicReference<>(null);
- private AtomicReference<NodeBulletinProcessingStrategy> nodeBulletinSubscriber;
+ private final AtomicReference<NodeBulletinProcessingStrategy> nodeBulletinSubscriber;
// guarded by rwLock
/**
@@ -449,7 +449,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
this.protocolSender = protocolSender;
try {
this.templateManager = new TemplateManager(properties.getTemplateDirectory());
- } catch (IOException e) {
+ } catch (final IOException e) {
throw new RuntimeException(e);
}
@@ -794,7 +794,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
* @throws NullPointerException if either arg is null
* @throws ProcessorInstantiationException if the processor cannot be instantiated for any reason
*/
- public ProcessorNode createProcessor(final String type, String id) throws ProcessorInstantiationException {
+ public ProcessorNode createProcessor(final String type, final String id) throws ProcessorInstantiationException {
return createProcessor(type, id, true);
}
@@ -1508,7 +1508,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
}
if (config.getProperties() != null) {
- for (Map.Entry<String, String> entry : config.getProperties().entrySet()) {
+ for (final Map.Entry<String, String> entry : config.getProperties().entrySet()) {
if (entry.getValue() != null) {
procNode.setProperty(entry.getKey(), entry.getValue());
}
@@ -1661,7 +1661,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
Set<RemoteProcessGroupPortDescriptor> remotePorts = null;
if (ports != null) {
remotePorts = new LinkedHashSet<>(ports.size());
- for (RemoteProcessGroupPortDTO port : ports) {
+ for (final RemoteProcessGroupPortDTO port : ports) {
final StandardRemoteProcessGroupPortDescriptor descriptor = new StandardRemoteProcessGroupPortDescriptor();
descriptor.setId(port.getId());
descriptor.setName(port.getName());
@@ -3024,15 +3024,15 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
final PrimaryNodeState nodeState = primary ? PrimaryNodeState.ELECTED_PRIMARY_NODE : PrimaryNodeState.PRIMARY_NODE_REVOKED;
final ProcessGroup rootGroup = getGroup(getRootGroupId());
for (final ProcessorNode procNode : rootGroup.findAllProcessors()) {
- ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnPrimaryNodeStateChange.class, procNode.getProcessor(), nodeState);
+ ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnPrimaryNodeStateChange.class, procNode.getProcessor(), nodeState);
}
for (final ControllerServiceNode serviceNode : getAllControllerServices()) {
- ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnPrimaryNodeStateChange.class, serviceNode.getControllerServiceImplementation(), nodeState);
+ ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnPrimaryNodeStateChange.class, serviceNode.getControllerServiceImplementation(), nodeState);
}
for (final ReportingTaskNode reportingTaskNode : getAllReportingTasks()) {
- ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnPrimaryNodeStateChange.class, reportingTaskNode.getReportingTask(), nodeState);
+ ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnPrimaryNodeStateChange.class, reportingTaskNode.getReportingTask(), nodeState);
}
-
+
// update primary
this.primary = primary;
eventDrivenWorkerQueue.setPrimary(primary);
@@ -3092,7 +3092,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
public boolean isInputAvailable() {
try {
return contentRepository.isAccessible(createClaim(event.getPreviousContentClaimContainer(), event.getPreviousContentClaimSection(), event.getPreviousContentClaimIdentifier()));
- } catch (IOException e) {
+ } catch (final IOException e) {
return false;
}
}
@@ -3101,7 +3101,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
public boolean isOutputAvailable() {
try {
return contentRepository.isAccessible(createClaim(event.getContentClaimContainer(), event.getContentClaimSection(), event.getContentClaimIdentifier()));
- } catch (IOException e) {
+ } catch (final IOException e) {
return false;
}
}
@@ -3401,7 +3401,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
private final NodeProtocolSender protocolSender;
private final DateFormat dateFormatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS", Locale.US);
- public BulletinsTask(NodeProtocolSender protocolSender) {
+ public BulletinsTask(final NodeProtocolSender protocolSender) {
if (protocolSender == null) {
throw new IllegalArgumentException("NodeProtocolSender may not be null.");
}
@@ -3557,7 +3557,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
private class HeartbeatMessageGeneratorTask implements Runnable {
- private AtomicReference<HeartbeatMessage> heartbeatMessageRef = new AtomicReference<>();
+ private final AtomicReference<HeartbeatMessage> heartbeatMessageRef = new AtomicReference<>();
@Override
public void run() {
@@ -3624,7 +3624,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
}
@Override
- public List<ProvenanceEventRecord> getProvenanceEvents(long firstEventId, int maxRecords) throws IOException {
+ public List<ProvenanceEventRecord> getProvenanceEvents(final long firstEventId, final int maxRecords) throws IOException {
return new ArrayList<ProvenanceEventRecord>(provenanceEventRepository.getEvents(firstEventId, maxRecords));
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/ebee094f/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetHTTP.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetHTTP.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetHTTP.java
index a1f57da..1654a4f 100644
--- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetHTTP.java
+++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetHTTP.java
@@ -185,7 +185,7 @@ public class GetHTTP extends AbstractSessionFactoryProcessor {
static final String LAST_MODIFIED = "LastModified";
static {
- SimpleDateFormat sdf = new SimpleDateFormat(LAST_MODIFIED_DATE_PATTERN_RFC1123, Locale.US);
+ final SimpleDateFormat sdf = new SimpleDateFormat(LAST_MODIFIED_DATE_PATTERN_RFC1123, Locale.US);
sdf.setTimeZone(TimeZone.getTimeZone("GMT"));
UNINITIALIZED_LAST_MODIFIED_VALUE = sdf.format(new Date(1L));
}
@@ -221,13 +221,13 @@ public class GetHTTP extends AbstractSessionFactoryProcessor {
this.properties = Collections.unmodifiableList(properties);
// load etag and lastModified from file
- File httpCache = new File(HTTP_CACHE_FILE_PREFIX + getIdentifier());
+ final File httpCache = new File(HTTP_CACHE_FILE_PREFIX + getIdentifier());
try (FileInputStream fis = new FileInputStream(httpCache)) {
- Properties props = new Properties();
+ final Properties props = new Properties();
props.load(fis);
entityTagRef.set(props.getProperty(ETAG));
lastModifiedRef.set(props.getProperty(LAST_MODIFIED));
- } catch (IOException swallow) {
+ } catch (final IOException swallow) {
}
}
@@ -242,20 +242,20 @@ public class GetHTTP extends AbstractSessionFactoryProcessor {
}
@Override
- public void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) {
+ public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) {
entityTagRef.set("");
lastModifiedRef.set(UNINITIALIZED_LAST_MODIFIED_VALUE);
}
@OnShutdown
public void onShutdown() {
- File httpCache = new File(HTTP_CACHE_FILE_PREFIX + getIdentifier());
+ final File httpCache = new File(HTTP_CACHE_FILE_PREFIX + getIdentifier());
try (FileOutputStream fos = new FileOutputStream(httpCache)) {
- Properties props = new Properties();
+ final Properties props = new Properties();
props.setProperty(ETAG, entityTagRef.get());
props.setProperty(LAST_MODIFIED, lastModifiedRef.get());
props.store(fos, "GetHTTP file modification values");
- } catch (IOException swallow) {
+ } catch (final IOException swallow) {
}
}
@@ -287,7 +287,7 @@ public class GetHTTP extends AbstractSessionFactoryProcessor {
keystore.load(in, service.getKeyStorePassword().toCharArray());
}
- SSLContext sslContext = SSLContexts.custom().loadTrustMaterial(truststore, new TrustSelfSignedStrategy()).loadKeyMaterial(keystore, service.getKeyStorePassword().toCharArray()).build();
+ final SSLContext sslContext = SSLContexts.custom().loadTrustMaterial(truststore, new TrustSelfSignedStrategy()).loadKeyMaterial(keystore, service.getKeyStorePassword().toCharArray()).build();
return sslContext;
}
@@ -310,7 +310,7 @@ public class GetHTTP extends AbstractSessionFactoryProcessor {
try {
uri = new URI(url);
source = uri.getHost();
- } catch (URISyntaxException swallow) {
+ } catch (final URISyntaxException swallow) {
// this won't happen as the url has already been validated
}
@@ -435,20 +435,19 @@ public class GetHTTP extends AbstractSessionFactoryProcessor {
readLock.unlock();
writeLock.lock();
try {
- if (timeToPersist < System.currentTimeMillis()) {
+ if (timeToPersist < System.currentTimeMillis()) {
timeToPersist = System.currentTimeMillis() + PERSISTENCE_INTERVAL_MSEC;
- File httpCache = new File(HTTP_CACHE_FILE_PREFIX + getIdentifier());
+ final File httpCache = new File(HTTP_CACHE_FILE_PREFIX + getIdentifier());
try (FileOutputStream fos = new FileOutputStream(httpCache)) {
- Properties props = new Properties();
+ final Properties props = new Properties();
props.setProperty(ETAG, entityTagRef.get());
props.setProperty(LAST_MODIFIED, lastModifiedRef.get());
props.store(fos, "GetHTTP file modification values");
- } catch (IOException e) {
+ } catch (final IOException e) {
getLogger().error("Failed to persist ETag and LastMod due to " + e, e);
}
}
- }
- finally {
+ } finally {
readLock.lock();
writeLock.unlock();
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/ebee094f/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDetectDuplicate.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDetectDuplicate.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDetectDuplicate.java
index 6e9fb1f..4166d94 100644
--- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDetectDuplicate.java
+++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDetectDuplicate.java
@@ -57,7 +57,7 @@ public class TestDetectDuplicate {
@Test
public void testDuplicate() throws InitializationException {
- TestRunner runner = TestRunners.newTestRunner(DetectDuplicate.class);
+ final TestRunner runner = TestRunners.newTestRunner(DetectDuplicate.class);
final DistributedMapCacheClientImpl client = createClient();
final Map<String, String> clientProperties = new HashMap<>();
clientProperties.put(DistributedMapCacheClientService.HOSTNAME.getName(), "localhost");
@@ -65,7 +65,7 @@ public class TestDetectDuplicate {
runner.setProperty(DetectDuplicate.DISTRIBUTED_CACHE_SERVICE, "client");
runner.setProperty(DetectDuplicate.FLOWFILE_DESCRIPTION, "The original flow file");
runner.setProperty(DetectDuplicate.AGE_OFF_DURATION, "48 hours");
- Map<String, String> props = new HashMap<>();
+ final Map<String, String> props = new HashMap<>();
props.put("hash.value", "1000");
runner.enqueue(new byte[]{}, props);
runner.enableControllerService(client);
@@ -84,7 +84,7 @@ public class TestDetectDuplicate {
@Test
public void testDuplicateWithAgeOff() throws InitializationException, InterruptedException {
- TestRunner runner = TestRunners.newTestRunner(DetectDuplicate.class);
+ final TestRunner runner = TestRunners.newTestRunner(DetectDuplicate.class);
final DistributedMapCacheClientImpl client = createClient();
final Map<String, String> clientProperties = new HashMap<>();
clientProperties.put(DistributedMapCacheClientService.HOSTNAME.getName(), "localhost");
@@ -94,7 +94,7 @@ public class TestDetectDuplicate {
runner.setProperty(DetectDuplicate.AGE_OFF_DURATION, "2 secs");
runner.enableControllerService(client);
- Map<String, String> props = new HashMap<>();
+ final Map<String, String> props = new HashMap<>();
props.put("hash.value", "1000");
runner.enqueue(new byte[]{}, props);
@@ -114,7 +114,7 @@ public class TestDetectDuplicate {
final DistributedMapCacheClientImpl client = new DistributedMapCacheClientImpl();
final ComponentLog logger = new MockProcessorLog("client", client);
- MockControllerServiceInitializationContext clientInitContext = new MockControllerServiceInitializationContext(client, "client", logger);
+ final MockControllerServiceInitializationContext clientInitContext = new MockControllerServiceInitializationContext(client, "client", logger);
client.initialize(clientInitContext);
return client;
@@ -130,12 +130,12 @@ public class TestDetectDuplicate {
}
@Override
- public void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) {
+ public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) {
}
@Override
protected java.util.List<PropertyDescriptor> getSupportedPropertyDescriptors() {
- List<PropertyDescriptor> props = new ArrayList<>();
+ final List<PropertyDescriptor> props = new ArrayList<>();
props.add(DistributedMapCacheClientService.HOSTNAME);
props.add(DistributedMapCacheClientService.COMMUNICATIONS_TIMEOUT);
props.add(DistributedMapCacheClientService.PORT);
@@ -144,7 +144,7 @@ public class TestDetectDuplicate {
}
@Override
- public <K, V> boolean putIfAbsent(K key, V value, Serializer<K> keySerializer, Serializer<V> valueSerializer) throws IOException {
+ public <K, V> boolean putIfAbsent(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer) throws IOException {
if (exists) {
return false;
}
@@ -154,7 +154,8 @@ public class TestDetectDuplicate {
}
@Override
- public <K, V> V getAndPutIfAbsent(K key, V value, Serializer<K> keySerializer, Serializer<V> valueSerializer, Deserializer<V> valueDeserializer) throws IOException {
+ public <K, V> V getAndPutIfAbsent(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer,
+ final Deserializer<V> valueDeserializer) throws IOException {
if (exists) {
return (V) cacheValue;
}
@@ -163,25 +164,24 @@ public class TestDetectDuplicate {
}
@Override
- public <K> boolean containsKey(K key, Serializer<K> keySerializer) throws IOException {
+ public <K> boolean containsKey(final K key, final Serializer<K> keySerializer) throws IOException {
return exists;
}
@Override
- public <K, V> V get(K key, Serializer<K> keySerializer, Deserializer<V> valueDeserializer) throws IOException {
+ public <K, V> V get(final K key, final Serializer<K> keySerializer, final Deserializer<V> valueDeserializer) throws IOException {
return null;
}
@Override
- public <K> boolean remove(K key, Serializer<K> serializer) throws IOException {
+ public <K> boolean remove(final K key, final Serializer<K> serializer) throws IOException {
exists = false;
return true;
}
- @Override
- public <K, V> void put(K key, V value, Serializer<K> keySerializer, Serializer<V> valueSerializer) throws IOException {
-
- }
+ @Override
+ public <K, V> void put(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer) throws IOException {
+ }
}
private static class StringSerializer implements Serializer<String> {
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/ebee094f/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java
index ea86071..c03dd5a 100644
--- a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java
+++ b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java
@@ -46,39 +46,39 @@ import org.slf4j.LoggerFactory;
@Tags({"distributed", "cache", "state", "map", "cluster"})
@SeeAlso(classNames = {"org.apache.nifi.distributed.cache.server.map.DistributedMapCacheServer", "org.apache.nifi.ssl.StandardSSLContextService"})
@CapabilityDescription("Provides the ability to communicate with a DistributedMapCacheServer. This can be used in order to share a Map "
- + "between nodes in a NiFi cluster")
+ + "between nodes in a NiFi cluster")
public class DistributedMapCacheClientService extends AbstractControllerService implements DistributedMapCacheClient {
private static final Logger logger = LoggerFactory.getLogger(DistributedMapCacheClientService.class);
public static final PropertyDescriptor HOSTNAME = new PropertyDescriptor.Builder()
- .name("Server Hostname")
- .description("The name of the server that is running the DistributedMapCacheServer service")
- .required(true)
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .build();
+ .name("Server Hostname")
+ .description("The name of the server that is running the DistributedMapCacheServer service")
+ .required(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
public static final PropertyDescriptor PORT = new PropertyDescriptor.Builder()
- .name("Server Port")
- .description("The port on the remote server that is to be used when communicating with the DistributedMapCacheServer service")
- .required(true)
- .addValidator(StandardValidators.PORT_VALIDATOR)
- .defaultValue("4557")
- .build();
+ .name("Server Port")
+ .description("The port on the remote server that is to be used when communicating with the DistributedMapCacheServer service")
+ .required(true)
+ .addValidator(StandardValidators.PORT_VALIDATOR)
+ .defaultValue("4557")
+ .build();
public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
- .name("SSL Context Service")
- .description("If specified, indicates the SSL Context Service that is used to communicate with the "
- + "remote server. If not specified, communications will not be encrypted")
- .required(false)
- .identifiesControllerService(SSLContextService.class)
- .build();
+ .name("SSL Context Service")
+ .description("If specified, indicates the SSL Context Service that is used to communicate with the "
+ + "remote server. If not specified, communications will not be encrypted")
+ .required(false)
+ .identifiesControllerService(SSLContextService.class)
+ .build();
public static final PropertyDescriptor COMMUNICATIONS_TIMEOUT = new PropertyDescriptor.Builder()
- .name("Communications Timeout")
- .description("Specifies how long to wait when communicating with the remote server before determining that "
- + "there is a communications failure if data cannot be sent or received")
- .required(true)
- .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
- .defaultValue("30 secs")
- .build();
+ .name("Communications Timeout")
+ .description("Specifies how long to wait when communicating with the remote server before determining that "
+ + "there is a communications failure if data cannot be sent or received")
+ .required(true)
+ .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+ .defaultValue("30 secs")
+ .build();
private final BlockingQueue<CommsSession> queue = new LinkedBlockingQueue<>();
private volatile ConfigurationContext configContext;
@@ -118,28 +118,29 @@ public class DistributedMapCacheClientService extends AbstractControllerService
});
}
+ @Override
public <K, V> void put(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer) throws IOException {
- withCommsSession(new CommsAction<Object>() {
- @Override
- public Object execute(final CommsSession session) throws IOException {
- final DataOutputStream dos = new DataOutputStream(session.getOutputStream());
- dos.writeUTF("put");
-
- serialize(key, keySerializer, dos);
- serialize(value, valueSerializer, dos);
-
- dos.flush();
- final DataInputStream dis = new DataInputStream(session.getInputStream());
- final boolean success = dis.readBoolean();
- if ( !success ) {
- throw new IOException("Expected to receive confirmation of 'put' request but received unexpected response");
- }
-
- return null;
- }
- });
+ withCommsSession(new CommsAction<Object>() {
+ @Override
+ public Object execute(final CommsSession session) throws IOException {
+ final DataOutputStream dos = new DataOutputStream(session.getOutputStream());
+ dos.writeUTF("put");
+
+ serialize(key, keySerializer, dos);
+ serialize(value, valueSerializer, dos);
+
+ dos.flush();
+ final DataInputStream dis = new DataInputStream(session.getInputStream());
+ final boolean success = dis.readBoolean();
+ if ( !success ) {
+ throw new IOException("Expected to receive confirmation of 'put' request but received unexpected response");
+ }
+
+ return null;
+ }
+ });
}
-
+
@Override
public <K> boolean containsKey(final K key, final Serializer<K> keySerializer) throws IOException {
return withCommsSession(new CommsAction<Boolean>() {
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/ebee094f/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/PersistentMapCache.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/PersistentMapCache.java b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/PersistentMapCache.java
index 663f441..c2fc0d7 100644
--- a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/PersistentMapCache.java
+++ b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/PersistentMapCache.java
@@ -35,7 +35,6 @@ import org.wali.UpdateType;
import org.wali.WriteAheadRepository;
public class PersistentMapCache implements MapCache {
-
private final MapCache wrapped;
private final WriteAheadRepository<MapWaliRecord> wali;
@@ -78,10 +77,10 @@ public class PersistentMapCache implements MapCache {
return putResult;
}
-
+
@Override
public MapPutResult put(final ByteBuffer key, final ByteBuffer value) throws IOException {
- final MapPutResult putResult = wrapped.put(key, value);
+ final MapPutResult putResult = wrapped.put(key, value);
if ( putResult.isSuccessful() ) {
// The put was successful.
final MapWaliRecord record = new MapWaliRecord(UpdateType.CREATE, key, value);
@@ -91,15 +90,15 @@ public class PersistentMapCache implements MapCache {
if ( putResult.getEvictedKey() != null ) {
records.add(new MapWaliRecord(UpdateType.DELETE, putResult.getEvictedKey(), putResult.getEvictedValue()));
}
-
+
wali.update(Collections.singletonList(record), false);
-
+
final long modCount = modifications.getAndIncrement();
if ( modCount > 0 && modCount % 100000 == 0 ) {
wali.checkpoint();
}
}
-
+
return putResult;
}
@@ -114,7 +113,7 @@ public class PersistentMapCache implements MapCache {
}
@Override
- public ByteBuffer remove(ByteBuffer key) throws IOException {
+ public ByteBuffer remove(final ByteBuffer key) throws IOException {
final ByteBuffer removeResult = wrapped.remove(key);
if (removeResult != null) {
final MapWaliRecord record = new MapWaliRecord(UpdateType.DELETE, key, removeResult);
@@ -163,7 +162,7 @@ public class PersistentMapCache implements MapCache {
private static class Serde implements SerDe<MapWaliRecord> {
@Override
- public void serializeEdit(MapWaliRecord previousRecordState, MapWaliRecord newRecordState, java.io.DataOutputStream out) throws IOException {
+ public void serializeEdit(final MapWaliRecord previousRecordState, final MapWaliRecord newRecordState, final java.io.DataOutputStream out) throws IOException {
final UpdateType updateType = newRecordState.getUpdateType();
if (updateType == UpdateType.DELETE) {
out.write(0);
@@ -181,7 +180,7 @@ public class PersistentMapCache implements MapCache {
}
@Override
- public void serializeRecord(MapWaliRecord record, java.io.DataOutputStream out) throws IOException {
+ public void serializeRecord(final MapWaliRecord record, final java.io.DataOutputStream out) throws IOException {
serializeEdit(null, record, out);
}
@@ -206,7 +205,7 @@ public class PersistentMapCache implements MapCache {
}
@Override
- public MapWaliRecord deserializeRecord(DataInputStream in, int version) throws IOException {
+ public MapWaliRecord deserializeRecord(final DataInputStream in, final int version) throws IOException {
return deserializeEdit(in, new HashMap<Object, MapWaliRecord>(), version);
}