You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by mh...@apache.org on 2019/07/03 00:11:20 UTC

[asterixdb] 02/02: Merge commit '0561d10' from stabilization-f69489

This is an automated email from the ASF dual-hosted git repository.

mhubail pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git

commit 610f6f5fb4d120e45a94b52ccf9b72000c3a72a5
Merge: f89de3d 0561d10
Author: Murtadha Hubail <mh...@apache.org>
AuthorDate: Tue Jul 2 21:26:24 2019 +0300

    Merge commit '0561d10' from stabilization-f69489
    
    Change-Id: I18115329ce7ab3501e7fcc9c6dc06d0e87c97688

 .../asterix/common/transactions/ICheckpointManager.java | 12 +++++++++++-
 .../asterix/replication/sync/ReplicaSynchronizer.java   | 17 +++++++++++------
 .../management/service/recovery/CheckpointManager.java  | 16 +++++++++++++++-
 3 files changed, 37 insertions(+), 8 deletions(-)

diff --cc asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ICheckpointManager.java
index ac652e3,954e399..6926dac
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ICheckpointManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ICheckpointManager.java
@@@ -60,9 -60,12 +60,19 @@@ public interface ICheckpointManager ext
      void completed(TxnId id);
  
      /**
 +     * Checkpoints idle datasets by flushing their in-memory component to disk if needed.
 +     *
 +     * @throws HyracksDataException
 +     */
 +    void checkpointIdleDatasets() throws HyracksDataException;
- }
++
++    /**
+      * Suspends checkpointing datasets
+      */
+     void suspend();
+ 
+     /**
+      * Resumes checkpointing datasets
+      */
+     void resume();
 -}
++}
diff --cc asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointManager.java
index ba945be,b85742e..43758d3
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointManager.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointManager.java
@@@ -43,8 -39,8 +43,9 @@@ public class CheckpointManager extends 
  
      private static final Logger LOGGER = LogManager.getLogger();
      private static final long NO_SECURED_LSN = -1L;
 +    private final long datasetCheckpointInterval;
      private final Map<TxnId, Long> securedLSNs;
+     private boolean suspended = false;
  
      public CheckpointManager(ITransactionSubsystem txnSubsystem, CheckpointProperties checkpointProperties) {
          super(txnSubsystem, checkpointProperties);
@@@ -82,10 -77,11 +83,10 @@@
          }
          final long minFirstLSN = txnSubsystem.getRecoveryManager().getMinFirstLSN();
          boolean checkpointSucceeded = minFirstLSN >= checkpointTargetLSN;
-         if (!checkpointSucceeded) {
+         if (!checkpointSucceeded && !suspended) {
              // Flush datasets with indexes behind target checkpoint LSN
 -            IDatasetLifecycleManager datasetLifecycleManager =
 -                    txnSubsystem.getApplicationContext().getDatasetLifecycleManager();
 -            datasetLifecycleManager.scheduleAsyncFlushForLaggingDatasets(checkpointTargetLSN);
 +            final IDatasetLifecycleManager dlcm = txnSubsystem.getApplicationContext().getDatasetLifecycleManager();
 +            dlcm.asyncFlushMatchingIndexes(newLaggingDatasetPredicate(checkpointTargetLSN));
          }
          capture(minFirstLSN, false);
          if (checkpointSucceeded) {
@@@ -106,11 -102,15 +107,24 @@@
      }
  
      @Override
 +    public synchronized void checkpointIdleDatasets() throws HyracksDataException {
++        if (suspended) {
++            return;
++        }
 +        final IDatasetLifecycleManager dlcm = txnSubsystem.getApplicationContext().getDatasetLifecycleManager();
 +        dlcm.asyncFlushMatchingIndexes(newIdleDatasetPredicate());
 +    }
 +
++    @Override
+     public synchronized void suspend() {
+         suspended = true;
+     }
+ 
+     @Override
+     public synchronized void resume() {
+         suspended = false;
+     }
+ 
      private synchronized long getMinSecuredLSN() {
          return securedLSNs.isEmpty() ? NO_SECURED_LSN : Collections.min(securedLSNs.values());
      }