You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by pv...@apache.org on 2017/01/05 19:00:27 UTC

nifi git commit: NIFI-3274 Adding WriteAheadLog configuration options to WriteAheadLogLocalStateProvider

Repository: nifi
Updated Branches:
  refs/heads/master 68057cb4a -> 273e69f2c


NIFI-3274 Adding WriteAheadLog configuration options to WriteAheadLogLocalStateProvider

This closes #1386.


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/273e69f2
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/273e69f2
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/273e69f2

Branch: refs/heads/master
Commit: 273e69f2cb833fd8cdcc99f70b8ab23fcc16d5f0
Parents: 68057cb
Author: jpercivall <JP...@apache.org>
Authored: Tue Jan 3 18:13:54 2017 -0500
Committer: Pierre Villard <pi...@gmail.com>
Committed: Thu Jan 5 19:59:58 2017 +0100

----------------------------------------------------------------------
 .../local/WriteAheadLocalStateProvider.java     | 55 +++++++++++++++++---
 .../local/TestWriteAheadLocalStateProvider.java |  3 ++
 .../main/resources/conf/state-management.xml    |  8 +++
 3 files changed, 58 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/273e69f2/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/providers/local/WriteAheadLocalStateProvider.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/providers/local/WriteAheadLocalStateProvider.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/providers/local/WriteAheadLocalStateProvider.java
index fc691fb..e341c4c 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/providers/local/WriteAheadLocalStateProvider.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/providers/local/WriteAheadLocalStateProvider.java
@@ -54,6 +54,8 @@ import org.wali.WriteAheadRepository;
 public class WriteAheadLocalStateProvider extends AbstractStateProvider {
     private static final Logger logger = LoggerFactory.getLogger(WriteAheadLocalStateProvider.class);
 
+    private volatile boolean alwaysSync;
+
     private final StateMapSerDe serde;
     private final ConcurrentMap<String, ComponentProvider> componentProviders = new ConcurrentHashMap<>();
     private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory());
@@ -66,6 +68,33 @@ public class WriteAheadLocalStateProvider extends AbstractStateProvider {
         .required(true)
         .build();
 
+    static final PropertyDescriptor ALWAYS_SYNC = new PropertyDescriptor.Builder()
+        .name("Always Sync")
+        .description("If set to true, any change to the repository will be synchronized to the disk, meaning that NiFi will ask the operating system not to cache the information. This is very " +
+                "expensive and can significantly reduce NiFi performance. However, if it is false, there could be the potential for data loss if either there is a sudden power loss or the " +
+                "operating system crashes. The default value is false.")
+        .allowableValues("true", "false")
+        .defaultValue("false")
+        .required(true)
+        .build();
+
+    static final PropertyDescriptor NUM_PARTITIONS = new PropertyDescriptor.Builder()
+        .name("Partitions")
+        .description("The number of partitions.")
+        .addValidator(StandardValidators.createLongValidator(1, Integer.MAX_VALUE, true))
+        .defaultValue("16")
+        .required(true)
+        .build();
+
+    static final PropertyDescriptor CHECKPOINT_INTERVAL = new PropertyDescriptor.Builder()
+        .name("Checkpoint Interval")
+        .description("The amount of time between checkpoints.")
+        .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+        .defaultValue("2 mins")
+        .required(true)
+        .build();
+
+
     private WriteAheadRepository<StateMapUpdate> writeAheadLog;
     private AtomicLong versionGenerator;
 
@@ -75,6 +104,11 @@ public class WriteAheadLocalStateProvider extends AbstractStateProvider {
 
     @Override
     public synchronized void init(final StateProviderInitializationContext context) throws IOException {
+        long checkpointIntervalMillis = context.getProperty(CHECKPOINT_INTERVAL).asTimePeriod(TimeUnit.MILLISECONDS);
+        int numPartitions = context.getProperty(NUM_PARTITIONS).asInteger();
+        alwaysSync = context.getProperty(ALWAYS_SYNC).asBoolean();
+
+
         final File basePath = new File(context.getProperty(PATH).getValue());
 
         if (!basePath.exists() && !basePath.mkdirs()) {
@@ -94,7 +128,7 @@ public class WriteAheadLocalStateProvider extends AbstractStateProvider {
         }
 
         versionGenerator = new AtomicLong(-1L);
-        writeAheadLog = new MinimalLockingWriteAheadLog<>(basePath.toPath(), 16, serde, null);
+        writeAheadLog = new MinimalLockingWriteAheadLog<>(basePath.toPath(), numPartitions, serde, null);
 
         final Collection<StateMapUpdate> updates = writeAheadLog.recoverRecords();
         long maxRecordVersion = -1L;
@@ -110,7 +144,7 @@ public class WriteAheadLocalStateProvider extends AbstractStateProvider {
             }
 
             final String componentId = update.getComponentId();
-            componentProviders.put(componentId, new ComponentProvider(writeAheadLog, versionGenerator, componentId, update.getStateMap()));
+            componentProviders.put(componentId, new ComponentProvider(writeAheadLog, versionGenerator, componentId, update.getStateMap(), alwaysSync));
         }
 
         // keep a separate maxRecordVersion and set it at the end so that we don't have to continually update an AtomicLong, which is more
@@ -118,13 +152,16 @@ public class WriteAheadLocalStateProvider extends AbstractStateProvider {
         // the init() method completes, this is okay to do.
         versionGenerator.set(maxRecordVersion);
 
-        executor.scheduleWithFixedDelay(new CheckpointTask(), 2, 2, TimeUnit.MINUTES);
+        executor.scheduleWithFixedDelay(new CheckpointTask(), checkpointIntervalMillis, checkpointIntervalMillis, TimeUnit.MILLISECONDS);
     }
 
     @Override
     public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
         final List<PropertyDescriptor> properties = new ArrayList<>();
         properties.add(PATH);
+        properties.add(ALWAYS_SYNC);
+        properties.add(CHECKPOINT_INTERVAL);
+        properties.add(NUM_PARTITIONS);
         return properties;
     }
 
@@ -144,7 +181,7 @@ public class WriteAheadLocalStateProvider extends AbstractStateProvider {
         ComponentProvider componentProvider = componentProviders.get(componentId);
         if (componentProvider == null) {
             final StateMap stateMap = new StandardStateMap(Collections.<String, String> emptyMap(), -1L);
-            componentProvider = new ComponentProvider(writeAheadLog, versionGenerator, componentId, stateMap);
+            componentProvider = new ComponentProvider(writeAheadLog, versionGenerator, componentId, stateMap, alwaysSync);
 
             final ComponentProvider existingComponentProvider = componentProviders.putIfAbsent(componentId, componentProvider);
             if (existingComponentProvider != null) {
@@ -190,14 +227,16 @@ public class WriteAheadLocalStateProvider extends AbstractStateProvider {
         private final AtomicLong versionGenerator;
         private final WriteAheadRepository<StateMapUpdate> wal;
         private final String componentId;
+        private final boolean alwaysSync;
 
         private StateMap stateMap;
 
-        public ComponentProvider(final WriteAheadRepository<StateMapUpdate> wal, final AtomicLong versionGenerator, final String componentId, final StateMap stateMap) {
+        public ComponentProvider(final WriteAheadRepository<StateMapUpdate> wal, final AtomicLong versionGenerator, final String componentId, final StateMap stateMap, final boolean alwaysSync) {
             this.wal = wal;
             this.versionGenerator = versionGenerator;
             this.componentId = componentId;
             this.stateMap = stateMap;
+            this.alwaysSync = alwaysSync;
         }
 
         public synchronized StateMap getState() throws IOException {
@@ -211,7 +250,7 @@ public class WriteAheadLocalStateProvider extends AbstractStateProvider {
         public synchronized void setState(final Map<String, String> state) throws IOException {
             stateMap = new StandardStateMap(state, versionGenerator.incrementAndGet());
             final StateMapUpdate updateRecord = new StateMapUpdate(stateMap, componentId, UpdateType.UPDATE);
-            wal.update(Collections.singleton(updateRecord), false);
+            wal.update(Collections.singleton(updateRecord), alwaysSync);
         }
 
         // see above explanation as to why this method is synchronized.
@@ -227,14 +266,14 @@ public class WriteAheadLocalStateProvider extends AbstractStateProvider {
 
             stateMap = new StandardStateMap(new HashMap<>(newValue), versionGenerator.incrementAndGet());
             final StateMapUpdate updateRecord = new StateMapUpdate(stateMap, componentId, UpdateType.UPDATE);
-            wal.update(Collections.singleton(updateRecord), false);
+            wal.update(Collections.singleton(updateRecord), alwaysSync);
             return true;
         }
 
         public synchronized void clear() throws IOException {
             stateMap = new StandardStateMap(null, versionGenerator.incrementAndGet());
             final StateMapUpdate update = new StateMapUpdate(stateMap, componentId, UpdateType.UPDATE);
-            wal.update(Collections.singleton(update), false);
+            wal.update(Collections.singleton(update), alwaysSync);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/273e69f2/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/state/providers/local/TestWriteAheadLocalStateProvider.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/state/providers/local/TestWriteAheadLocalStateProvider.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/state/providers/local/TestWriteAheadLocalStateProvider.java
index 3a6310f..ac0b030 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/state/providers/local/TestWriteAheadLocalStateProvider.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/state/providers/local/TestWriteAheadLocalStateProvider.java
@@ -45,6 +45,9 @@ public class TestWriteAheadLocalStateProvider extends AbstractTestStateProvider
         provider = new WriteAheadLocalStateProvider();
         final Map<PropertyDescriptor, PropertyValue> properties = new HashMap<>();
         properties.put(WriteAheadLocalStateProvider.PATH, new StandardPropertyValue("target/local-state-provider/" + UUID.randomUUID().toString(), null));
+        properties.put(WriteAheadLocalStateProvider.ALWAYS_SYNC, new StandardPropertyValue("false", null));
+        properties.put(WriteAheadLocalStateProvider.CHECKPOINT_INTERVAL, new StandardPropertyValue("2 mins", null));
+        properties.put(WriteAheadLocalStateProvider.NUM_PARTITIONS, new StandardPropertyValue("16", null));
 
         provider.initialize(new StateProviderInitializationContext() {
             @Override

http://git-wip-us.apache.org/repos/asf/nifi/blob/273e69f2/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/state-management.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/state-management.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/state-management.xml
index d4a13cf..d7631c2 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/state-management.xml
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/state-management.xml
@@ -25,11 +25,19 @@
         
         Directory - the directory to store components' state in. If the directory being used is a sub-directory of the NiFi installation, it
                     is important that the directory be copied over to the new version when upgrading NiFi.
+        Always Sync - If set to true, any change to the repository will be synchronized to the disk, meaning that NiFi will ask the operating system not to cache the information. This is very
+                expensive and can significantly reduce NiFi performance. However, if it is false, there could be the potential for data loss if either there is a sudden power loss or the
+                operating system crashes. The default value is false.
+        Partitions - The number of partitions.
+        Checkpoint Interval - The amount of time between checkpoints.
      -->
     <local-provider>
         <id>local-provider</id>
         <class>org.apache.nifi.controller.state.providers.local.WriteAheadLocalStateProvider</class>
         <property name="Directory">./state/local</property>
+        <property name="Always Sync">false</property>
+        <property name="Partitions">16</property>
+        <property name="Checkpoint Interval">2 mins</property>
     </local-provider>
 
     <!--