You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by pv...@apache.org on 2017/01/05 19:00:27 UTC
nifi git commit: NIFI-3274 Adding WriteAheadLog configuration options
to WriteAheadLogLocalStateProvider
Repository: nifi
Updated Branches:
refs/heads/master 68057cb4a -> 273e69f2c
NIFI-3274 Adding WriteAheadLog configuration options to WriteAheadLogLocalStateProvider
This closes #1386.
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/273e69f2
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/273e69f2
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/273e69f2
Branch: refs/heads/master
Commit: 273e69f2cb833fd8cdcc99f70b8ab23fcc16d5f0
Parents: 68057cb
Author: jpercivall <JP...@apache.org>
Authored: Tue Jan 3 18:13:54 2017 -0500
Committer: Pierre Villard <pi...@gmail.com>
Committed: Thu Jan 5 19:59:58 2017 +0100
----------------------------------------------------------------------
.../local/WriteAheadLocalStateProvider.java | 55 +++++++++++++++++---
.../local/TestWriteAheadLocalStateProvider.java | 3 ++
.../main/resources/conf/state-management.xml | 8 +++
3 files changed, 58 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/273e69f2/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/providers/local/WriteAheadLocalStateProvider.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/providers/local/WriteAheadLocalStateProvider.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/providers/local/WriteAheadLocalStateProvider.java
index fc691fb..e341c4c 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/providers/local/WriteAheadLocalStateProvider.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/providers/local/WriteAheadLocalStateProvider.java
@@ -54,6 +54,8 @@ import org.wali.WriteAheadRepository;
public class WriteAheadLocalStateProvider extends AbstractStateProvider {
private static final Logger logger = LoggerFactory.getLogger(WriteAheadLocalStateProvider.class);
+ private volatile boolean alwaysSync;
+
private final StateMapSerDe serde;
private final ConcurrentMap<String, ComponentProvider> componentProviders = new ConcurrentHashMap<>();
private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory());
@@ -66,6 +68,33 @@ public class WriteAheadLocalStateProvider extends AbstractStateProvider {
.required(true)
.build();
+ static final PropertyDescriptor ALWAYS_SYNC = new PropertyDescriptor.Builder()
+ .name("Always Sync")
+ .description("If set to true, any change to the repository will be synchronized to the disk, meaning that NiFi will ask the operating system not to cache the information. This is very " +
+ "expensive and can significantly reduce NiFi performance. However, if it is false, there could be the potential for data loss if either there is a sudden power loss or the " +
+ "operating system crashes. The default value is false.")
+ .allowableValues("true", "false")
+ .defaultValue("false")
+ .required(true)
+ .build();
+
+ static final PropertyDescriptor NUM_PARTITIONS = new PropertyDescriptor.Builder()
+ .name("Partitions")
+ .description("The number of partitions.")
+ .addValidator(StandardValidators.createLongValidator(1, Integer.MAX_VALUE, true))
+ .defaultValue("16")
+ .required(true)
+ .build();
+
+ static final PropertyDescriptor CHECKPOINT_INTERVAL = new PropertyDescriptor.Builder()
+ .name("Checkpoint Interval")
+ .description("The amount of time between checkpoints.")
+ .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+ .defaultValue("2 mins")
+ .required(true)
+ .build();
+
+
private WriteAheadRepository<StateMapUpdate> writeAheadLog;
private AtomicLong versionGenerator;
@@ -75,6 +104,11 @@ public class WriteAheadLocalStateProvider extends AbstractStateProvider {
@Override
public synchronized void init(final StateProviderInitializationContext context) throws IOException {
+ long checkpointIntervalMillis = context.getProperty(CHECKPOINT_INTERVAL).asTimePeriod(TimeUnit.MILLISECONDS);
+ int numPartitions = context.getProperty(NUM_PARTITIONS).asInteger();
+ alwaysSync = context.getProperty(ALWAYS_SYNC).asBoolean();
+
+
final File basePath = new File(context.getProperty(PATH).getValue());
if (!basePath.exists() && !basePath.mkdirs()) {
@@ -94,7 +128,7 @@ public class WriteAheadLocalStateProvider extends AbstractStateProvider {
}
versionGenerator = new AtomicLong(-1L);
- writeAheadLog = new MinimalLockingWriteAheadLog<>(basePath.toPath(), 16, serde, null);
+ writeAheadLog = new MinimalLockingWriteAheadLog<>(basePath.toPath(), numPartitions, serde, null);
final Collection<StateMapUpdate> updates = writeAheadLog.recoverRecords();
long maxRecordVersion = -1L;
@@ -110,7 +144,7 @@ public class WriteAheadLocalStateProvider extends AbstractStateProvider {
}
final String componentId = update.getComponentId();
- componentProviders.put(componentId, new ComponentProvider(writeAheadLog, versionGenerator, componentId, update.getStateMap()));
+ componentProviders.put(componentId, new ComponentProvider(writeAheadLog, versionGenerator, componentId, update.getStateMap(), alwaysSync));
}
// keep a separate maxRecordVersion and set it at the end so that we don't have to continually update an AtomicLong, which is more
@@ -118,13 +152,16 @@ public class WriteAheadLocalStateProvider extends AbstractStateProvider {
// the init() method completes, this is okay to do.
versionGenerator.set(maxRecordVersion);
- executor.scheduleWithFixedDelay(new CheckpointTask(), 2, 2, TimeUnit.MINUTES);
+ executor.scheduleWithFixedDelay(new CheckpointTask(), checkpointIntervalMillis, checkpointIntervalMillis, TimeUnit.MILLISECONDS);
}
@Override
public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> properties = new ArrayList<>();
properties.add(PATH);
+ properties.add(ALWAYS_SYNC);
+ properties.add(CHECKPOINT_INTERVAL);
+ properties.add(NUM_PARTITIONS);
return properties;
}
@@ -144,7 +181,7 @@ public class WriteAheadLocalStateProvider extends AbstractStateProvider {
ComponentProvider componentProvider = componentProviders.get(componentId);
if (componentProvider == null) {
final StateMap stateMap = new StandardStateMap(Collections.<String, String> emptyMap(), -1L);
- componentProvider = new ComponentProvider(writeAheadLog, versionGenerator, componentId, stateMap);
+ componentProvider = new ComponentProvider(writeAheadLog, versionGenerator, componentId, stateMap, alwaysSync);
final ComponentProvider existingComponentProvider = componentProviders.putIfAbsent(componentId, componentProvider);
if (existingComponentProvider != null) {
@@ -190,14 +227,16 @@ public class WriteAheadLocalStateProvider extends AbstractStateProvider {
private final AtomicLong versionGenerator;
private final WriteAheadRepository<StateMapUpdate> wal;
private final String componentId;
+ private final boolean alwaysSync;
private StateMap stateMap;
- public ComponentProvider(final WriteAheadRepository<StateMapUpdate> wal, final AtomicLong versionGenerator, final String componentId, final StateMap stateMap) {
+ public ComponentProvider(final WriteAheadRepository<StateMapUpdate> wal, final AtomicLong versionGenerator, final String componentId, final StateMap stateMap, final boolean alwaysSync) {
this.wal = wal;
this.versionGenerator = versionGenerator;
this.componentId = componentId;
this.stateMap = stateMap;
+ this.alwaysSync = alwaysSync;
}
public synchronized StateMap getState() throws IOException {
@@ -211,7 +250,7 @@ public class WriteAheadLocalStateProvider extends AbstractStateProvider {
public synchronized void setState(final Map<String, String> state) throws IOException {
stateMap = new StandardStateMap(state, versionGenerator.incrementAndGet());
final StateMapUpdate updateRecord = new StateMapUpdate(stateMap, componentId, UpdateType.UPDATE);
- wal.update(Collections.singleton(updateRecord), false);
+ wal.update(Collections.singleton(updateRecord), alwaysSync);
}
// see above explanation as to why this method is synchronized.
@@ -227,14 +266,14 @@ public class WriteAheadLocalStateProvider extends AbstractStateProvider {
stateMap = new StandardStateMap(new HashMap<>(newValue), versionGenerator.incrementAndGet());
final StateMapUpdate updateRecord = new StateMapUpdate(stateMap, componentId, UpdateType.UPDATE);
- wal.update(Collections.singleton(updateRecord), false);
+ wal.update(Collections.singleton(updateRecord), alwaysSync);
return true;
}
public synchronized void clear() throws IOException {
stateMap = new StandardStateMap(null, versionGenerator.incrementAndGet());
final StateMapUpdate update = new StateMapUpdate(stateMap, componentId, UpdateType.UPDATE);
- wal.update(Collections.singleton(update), false);
+ wal.update(Collections.singleton(update), alwaysSync);
}
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/273e69f2/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/state/providers/local/TestWriteAheadLocalStateProvider.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/state/providers/local/TestWriteAheadLocalStateProvider.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/state/providers/local/TestWriteAheadLocalStateProvider.java
index 3a6310f..ac0b030 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/state/providers/local/TestWriteAheadLocalStateProvider.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/state/providers/local/TestWriteAheadLocalStateProvider.java
@@ -45,6 +45,9 @@ public class TestWriteAheadLocalStateProvider extends AbstractTestStateProvider
provider = new WriteAheadLocalStateProvider();
final Map<PropertyDescriptor, PropertyValue> properties = new HashMap<>();
properties.put(WriteAheadLocalStateProvider.PATH, new StandardPropertyValue("target/local-state-provider/" + UUID.randomUUID().toString(), null));
+ properties.put(WriteAheadLocalStateProvider.ALWAYS_SYNC, new StandardPropertyValue("false", null));
+ properties.put(WriteAheadLocalStateProvider.CHECKPOINT_INTERVAL, new StandardPropertyValue("2 mins", null));
+ properties.put(WriteAheadLocalStateProvider.NUM_PARTITIONS, new StandardPropertyValue("16", null));
provider.initialize(new StateProviderInitializationContext() {
@Override
http://git-wip-us.apache.org/repos/asf/nifi/blob/273e69f2/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/state-management.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/state-management.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/state-management.xml
index d4a13cf..d7631c2 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/state-management.xml
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/state-management.xml
@@ -25,11 +25,19 @@
Directory - the directory to store components' state in. If the directory being used is a sub-directory of the NiFi installation, it
is important that the directory be copied over to the new version when upgrading NiFi.
+ Always Sync - If set to true, any change to the repository will be synchronized to the disk, meaning that NiFi will ask the operating system not to cache the information. This is very
+ expensive and can significantly reduce NiFi performance. However, if it is false, there could be the potential for data loss if either there is a sudden power loss or the
+ operating system crashes. The default value is false.
+ Partitions - The number of partitions.
+ Checkpoint Interval - The amount of time between checkpoints.
-->
<local-provider>
<id>local-provider</id>
<class>org.apache.nifi.controller.state.providers.local.WriteAheadLocalStateProvider</class>
<property name="Directory">./state/local</property>
+ <property name="Always Sync">false</property>
+ <property name="Partitions">16</property>
+ <property name="Checkpoint Interval">2 mins</property>
</local-provider>
<!--