You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by al...@apache.org on 2016/02/06 04:36:03 UTC
[07/50] nifi git commit: NIFI-259: Bug fixes
NIFI-259: Bug fixes
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/0cd6f80f
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/0cd6f80f
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/0cd6f80f
Branch: refs/heads/master
Commit: 0cd6f80f3624f4d46af63f444c3a0175021d2891
Parents: 06f525b
Author: Mark Payne <ma...@hotmail.com>
Authored: Wed Jan 13 15:11:53 2016 -0500
Committer: Mark Payne <ma...@hotmail.com>
Committed: Wed Jan 13 15:11:53 2016 -0500
----------------------------------------------------------------------
.../src/main/asciidoc/administration-guide.adoc | 2 ++
.../zookeeper/ZooKeeperStateProvider.java | 3 +++
.../standard/AbstractListProcessor.java | 19 ++++++++++---------
.../nifi/processors/standard/TailFile.java | 1 +
4 files changed, 16 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/0cd6f80f/nifi-docs/src/main/asciidoc/administration-guide.adoc
----------------------------------------------------------------------
diff --git a/nifi-docs/src/main/asciidoc/administration-guide.adoc b/nifi-docs/src/main/asciidoc/administration-guide.adoc
index 23327e2..8945600 100644
--- a/nifi-docs/src/main/asciidoc/administration-guide.adoc
+++ b/nifi-docs/src/main/asciidoc/administration-guide.adoc
@@ -553,6 +553,8 @@ Note, the above `kinit` command requires that Kerberos client libraries be insta
[source]
yum install krb5-workstation krb5-libs krb5-auth-dialog
+Once this is complete, the /etc/krb5.conf will need to be configured appropriately for your organization's Kerberos envrionment.
+
Now, when we start NiFi, it will use Kerberos to authentication as the `nifi` user when communicating with ZooKeeper.
http://git-wip-us.apache.org/repos/asf/nifi/blob/0cd6f80f/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/providers/zookeeper/ZooKeeperStateProvider.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/providers/zookeeper/ZooKeeperStateProvider.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/providers/zookeeper/ZooKeeperStateProvider.java
index faa0364..984a229 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/providers/zookeeper/ZooKeeperStateProvider.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/providers/zookeeper/ZooKeeperStateProvider.java
@@ -310,6 +310,9 @@ public class ZooKeeperStateProvider extends AbstractStateProvider {
invalidateClient();
setState(stateValues, version, componentId);
}
+ if (Code.NODEEXISTS == ke.code()) {
+ setState(stateValues, version, componentId);
+ }
throw new IOException("Failed to set cluster-wide state in ZooKeeper for component with ID " + componentId, ke);
} catch (final IOException ioe) {
http://git-wip-us.apache.org/repos/asf/nifi/blob/0cd6f80f/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractListProcessor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractListProcessor.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractListProcessor.java
index 494f227..fc19ad7 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractListProcessor.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractListProcessor.java
@@ -209,12 +209,12 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
final DistributedMapCacheClient client = context.getProperty(DISTRIBUTED_CACHE_SERVICE).asControllerService(DistributedMapCacheClient.class);
// Check if state already exists for this path. If so, we have already migrated the state.
- final StateMap stateMap = context.getStateManager().getState(Scope.CLUSTER);
+ final StateMap stateMap = context.getStateManager().getState(getStateScope(context));
if (stateMap.getVersion() == -1L) {
try {
// Migrate state from the old way of managing state (distributed cache service and local file)
// to the new mechanism (State Manager).
- migrateState(path, client, context.getStateManager());
+ migrateState(path, client, context.getStateManager(), getStateScope(context));
} catch (final IOException ioe) {
throw new IOException("Failed to properly migrate state to State Manager", ioe);
}
@@ -237,7 +237,7 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
}
if (resetListing) {
- context.getStateManager().clear(Scope.CLUSTER);
+ context.getStateManager().clear(getStateScope(context));
resetListing = false;
}
}
@@ -250,9 +250,10 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
* @param path the path to migrate state for
* @param client the DistributedMapCacheClient that is capable of obtaining the current state
* @param stateManager the StateManager to use in order to store the new state
+ * @param scope the scope to use
* @throws IOException if unable to retrieve or store the state
*/
- private void migrateState(final String path, final DistributedMapCacheClient client, final StateManager stateManager) throws IOException {
+ private void migrateState(final String path, final DistributedMapCacheClient client, final StateManager stateManager, final Scope scope) throws IOException {
Long minTimestamp = null;
final Set<String> latestIdentifiersListed = new HashSet<>();
@@ -289,11 +290,11 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
}
if (minTimestamp != null) {
- persist(minTimestamp, latestIdentifiersListed, stateManager);
+ persist(minTimestamp, latestIdentifiersListed, stateManager, scope);
}
}
- private void persist(final long timestamp, final Collection<String> identifiers, final StateManager stateManager) throws IOException {
+ private void persist(final long timestamp, final Collection<String> identifiers, final StateManager stateManager, final Scope scope) throws IOException {
final Map<String, String> updatedState = new HashMap<>(identifiers.size() + 1);
updatedState.put(TIMESTAMP, String.valueOf(timestamp));
int counter = 0;
@@ -301,7 +302,7 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
final String index = String.valueOf(++counter);
updatedState.put(IDENTIFIER_PREFIX + "." + index, identifier);
}
- stateManager.setState(updatedState, Scope.CLUSTER);
+ stateManager.setState(updatedState, scope);
}
protected String getKey(final String directory) {
@@ -322,7 +323,7 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
// We need to fetch the state from the cluster if we don't yet know the last listing time,
// or if we were just elected the primary node
if (this.lastListingTime == null || electedPrimaryNode) {
- final StateMap stateMap = context.getStateManager().getState(Scope.CLUSTER);
+ final StateMap stateMap = context.getStateManager().getState(getStateScope(context));
final Map<String, String> stateValues = stateMap.toMap();
final String timestamp = stateValues.get(TIMESTAMP);
@@ -409,7 +410,7 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
for (final T entity : newEntries) {
identifiers.add(entity.getIdentifier());
}
- persist(latestListingTimestamp, identifiers, context.getStateManager());
+ persist(latestListingTimestamp, identifiers, context.getStateManager(), getStateScope(context));
} catch (final IOException ioe) {
getLogger().warn("Unable to save state due to {}. If NiFi is restarted before state is saved, or "
+ "if another node begins executing this Processor, data duplication may occur.", ioe);
http://git-wip-us.apache.org/repos/asf/nifi/blob/0cd6f80f/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java
index f992978..b128366 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java
@@ -150,6 +150,7 @@ public class TailFile extends AbstractProcessor {
properties.add(ROLLING_FILENAME_PATTERN);
properties.add(new PropertyDescriptor.Builder().fromPropertyDescriptor(STATE_FILE).defaultValue("./conf/state/" + getIdentifier()).build());
properties.add(START_POSITION);
+ properties.add(FILE_LOCATION);
return properties;
}