You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by al...@apache.org on 2016/02/06 04:36:43 UTC
[47/50] nifi git commit: NIFI-259 Corrected some logic in
AbstractListProcessor regarding being elected primary node and improved
readability in GetHBase a smidge
NIFI-259 Corrected some logic in AbstractListProcessor regarding being elected primary node and improved readability in GetHBase a smidge
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/29fb9c93
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/29fb9c93
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/29fb9c93
Branch: refs/heads/master
Commit: 29fb9c939368054bd199edca2f3653796e586927
Parents: 6f4c3db
Author: jpercivall <jo...@yahoo.com>
Authored: Thu Feb 4 16:07:18 2016 -0500
Committer: jpercivall <jo...@yahoo.com>
Committed: Thu Feb 4 16:07:18 2016 -0500
----------------------------------------------------------------------
.../src/main/java/org/apache/nifi/hbase/GetHBase.java | 14 ++++++--------
.../processors/standard/AbstractListProcessor.java | 12 ++++++------
2 files changed, 12 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/29fb9c93/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/GetHBase.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/GetHBase.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/GetHBase.java
index 65b261a..fa4d80a 100644
--- a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/GetHBase.java
+++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/GetHBase.java
@@ -107,7 +107,7 @@ public class GetHBase extends AbstractProcessor {
.name("Distributed Cache Service")
.description("Specifies the Controller Service that should be used to maintain state about what has been pulled from HBase" +
" so that if a new node begins pulling data, it won't duplicate all of the work that has been done.")
- .required(false)
+ .required(false)
.identifiesControllerService(DistributedMapCacheClient.class)
.build();
static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder()
@@ -156,7 +156,7 @@ public class GetHBase extends AbstractProcessor {
private volatile ScanResult lastResult = null;
private volatile List<Column> columns = new ArrayList<>();
- private volatile boolean electedPrimaryNode = false;
+ private volatile boolean justElectedPrimaryNode = false;
private volatile String previousTable = null;
@Override
@@ -236,9 +236,7 @@ public class GetHBase extends AbstractProcessor {
@OnPrimaryNodeStateChange
public void onPrimaryNodeChange(final PrimaryNodeState newState) {
- if (newState == PrimaryNodeState.ELECTED_PRIMARY_NODE) {
- electedPrimaryNode = true;
- }
+ justElectedPrimaryNode = (newState == PrimaryNodeState.ELECTED_PRIMARY_NODE);
}
@OnRemoved
@@ -411,7 +409,7 @@ public class GetHBase extends AbstractProcessor {
lastResult = scanResult;
}
- // save state to local storage and to distributed cache
+ // save state using the framework's state manager
storeState(lastResult, context.getStateManager());
} catch (final IOException e) {
getLogger().error("Failed to receive data from HBase due to {}", e);
@@ -478,7 +476,7 @@ public class GetHBase extends AbstractProcessor {
ScanResult scanResult = lastResult;
// if we have no previous result, or we just became primary, pull from distributed cache
- if (scanResult == null || electedPrimaryNode) {
+ if (scanResult == null || justElectedPrimaryNode) {
if (client != null) {
final Object obj = client.get(getKey(), stringSerDe, objectSerDe);
if (obj == null || !(obj instanceof ScanResult)) {
@@ -490,7 +488,7 @@ public class GetHBase extends AbstractProcessor {
}
// no requirement to pull an update from the distributed cache anymore.
- electedPrimaryNode = false;
+ justElectedPrimaryNode = false;
}
// Check the persistence file. We want to use the latest timestamp that we have so that
http://git-wip-us.apache.org/repos/asf/nifi/blob/29fb9c93/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractListProcessor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractListProcessor.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractListProcessor.java
index b1c683c..53d0604 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractListProcessor.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractListProcessor.java
@@ -163,7 +163,7 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
private volatile Set<String> latestIdentifiersListed = new HashSet<>();
private volatile Long lastListingTime = null;
- private volatile boolean electedPrimaryNode = false;
+ private volatile boolean justElectedPrimaryNode = false;
private volatile boolean resetListing = false;
static final String TIMESTAMP = "timestamp";
@@ -198,9 +198,7 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
@OnPrimaryNodeStateChange
public void onPrimaryNodeChange(final PrimaryNodeState newState) {
- if (newState == PrimaryNodeState.ELECTED_PRIMARY_NODE) {
- electedPrimaryNode = true;
- }
+ justElectedPrimaryNode = (newState == PrimaryNodeState.ELECTED_PRIMARY_NODE);
}
@OnScheduled
@@ -222,7 +220,7 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
// delete the local file, since it is no longer needed
final File localFile = new File(path);
- if (localFile.exists() && !!localFile.delete()) {
+ if (localFile.exists() && !localFile.delete()) {
getLogger().warn("Migrated state but failed to delete local persistence file");
}
@@ -322,7 +320,7 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
try {
// We need to fetch the state from the cluster if we don't yet know the last listing time,
// or if we were just elected the primary node
- if (this.lastListingTime == null || electedPrimaryNode) {
+ if (this.lastListingTime == null || justElectedPrimaryNode) {
final StateMap stateMap = context.getStateManager().getState(getStateScope(context));
final Map<String, String> stateValues = stateMap.toMap();
final String timestamp = stateValues.get(TIMESTAMP);
@@ -343,6 +341,8 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
latestIdentifiersListed.add(value);
}
}
+
+ justElectedPrimaryNode = false;
}
} catch (final IOException ioe) {
getLogger().error("Failed to retrieve timestamp of last listing from Distributed Cache Service. Will not perform listing until this is accomplished.");