You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by mb...@apache.org on 2018/03/22 01:11:30 UTC

[5/6] asterixdb git commit: [ASTERIXDB-1708][TX] Prevent log deletion during scan

[ASTERIXDB-1708][TX] Prevent log deletion during scan

Right now there is a potential for a soft checkpoint to delete a
log file that is about to be read as part of a transaction rollback.
This patch stops the soft checkpoint from proceeding if a rollback
is about to take place and vice-versa.

Change-Id: Icff1a520af24c8fac8e5836cdbf46425b78b1260
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2508
Sonar-Qube: Jenkins <je...@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <je...@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <je...@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <je...@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mh...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/e6587f62
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/e6587f62
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/e6587f62

Branch: refs/heads/master
Commit: e6587f628a72f10594e676ec375c1cf26e73ec53
Parents: 01d7fe9
Author: Ian Maxon <im...@apache.org>
Authored: Wed Mar 21 11:58:21 2018 -0700
Committer: Ian Maxon <im...@apache.org>
Committed: Wed Mar 21 13:51:47 2018 -0700

----------------------------------------------------------------------
 .../apache/asterix/app/nc/RecoveryManager.java  |  19 +--
 .../asterix/test/logging/CheckpointingTest.java | 116 ++++++++++++++++---
 .../common/transactions/ICheckpointManager.java |  17 ++-
 .../management/service/logging/LogManager.java  |  61 +++-------
 .../service/recovery/CheckpointManager.java     |  28 ++++-
 .../service/transaction/TransactionManager.java |   5 +-
 6 files changed, 173 insertions(+), 73 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6587f62/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
index 4a2cf2d..4b14a9c 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
@@ -59,6 +59,7 @@ import org.apache.asterix.common.transactions.IRecoveryManager;
 import org.apache.asterix.common.transactions.ITransactionContext;
 import org.apache.asterix.common.transactions.ITransactionSubsystem;
 import org.apache.asterix.common.transactions.LogType;
+import org.apache.asterix.common.transactions.TxnId;
 import org.apache.asterix.transaction.management.opcallbacks.AbstractIndexModificationOperationCallback;
 import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
 import org.apache.asterix.transaction.management.service.logging.LogManager;
@@ -104,6 +105,7 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent {
     private SystemState state;
     private final INCServiceContext serviceCtx;
     private final INcApplicationContext appCtx;
+    private static final TxnId recoveryTxnId = new TxnId(-1);
 
     public RecoveryManager(ITransactionSubsystem txnSubsystem, INCServiceContext serviceCtx) {
         this.serviceCtx = serviceCtx;
@@ -505,21 +507,24 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent {
     }
 
     @Override
-    public void replayReplicaPartitionLogs(Set<Integer> partitions, boolean flush) throws HyracksDataException {
-        long minLSN = getPartitionsMinLSN(partitions);
-        long readableSmallestLSN = logMgr.getReadableSmallestLSN();
-        if (minLSN < readableSmallestLSN) {
-            minLSN = readableSmallestLSN;
-        }
-
+    public synchronized void replayReplicaPartitionLogs(Set<Integer> partitions, boolean flush)
+            throws HyracksDataException {
         //replay logs > minLSN that belong to these partitions
         try {
+            checkpointManager.secure(recoveryTxnId);
+            long minLSN = getPartitionsMinLSN(partitions);
+            long readableSmallestLSN = logMgr.getReadableSmallestLSN();
+            if (minLSN < readableSmallestLSN) {
+                minLSN = readableSmallestLSN;
+            }
             replayPartitionsLogs(partitions, logMgr.getLogReader(true), minLSN);
             if (flush) {
                 appCtx.getDatasetLifecycleManager().flushAllDatasets();
             }
         } catch (IOException | ACIDException e) {
             throw HyracksDataException.create(e);
+        } finally {
+            checkpointManager.completed(recoveryTxnId);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6587f62/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java
index 91d98e5..418282e 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java
@@ -28,16 +28,19 @@ import java.util.List;
 import org.apache.asterix.app.bootstrap.TestNodeController;
 import org.apache.asterix.app.data.gen.TupleGenerator;
 import org.apache.asterix.app.data.gen.TupleGenerator.GenerationFunction;
+import org.apache.asterix.app.nc.RecoveryManager;
 import org.apache.asterix.common.config.DatasetConfig.DatasetType;
-import org.apache.asterix.common.config.TransactionProperties;
 import org.apache.asterix.common.dataflow.LSMInsertDeleteOperatorNodePushable;
+import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.transactions.Checkpoint;
 import org.apache.asterix.common.transactions.ICheckpointManager;
-import org.apache.asterix.common.transactions.IRecoveryManager;
 import org.apache.asterix.common.transactions.ITransactionContext;
 import org.apache.asterix.common.transactions.ITransactionManager;
 import org.apache.asterix.common.transactions.ITransactionSubsystem;
+import org.apache.asterix.common.transactions.LogRecord;
 import org.apache.asterix.common.transactions.TransactionOptions;
+import org.apache.asterix.common.transactions.TxnId;
+import org.apache.asterix.common.utils.TransactionUtil;
 import org.apache.asterix.external.util.DataflowUtils;
 import org.apache.asterix.file.StorageComponentProvider;
 import org.apache.asterix.metadata.entities.Dataset;
@@ -50,20 +53,23 @@ import org.apache.asterix.om.types.IAType;
 import org.apache.asterix.test.common.TestHelper;
 import org.apache.asterix.transaction.management.service.logging.LogManager;
 import org.apache.asterix.transaction.management.service.recovery.AbstractCheckpointManager;
-import org.apache.commons.lang3.tuple.Pair;
+import org.apache.asterix.transaction.management.service.transaction.TransactionManager;
 import org.apache.hyracks.api.comm.VSizeFrame;
-import org.apache.hyracks.api.config.IOption;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
 import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
 import org.apache.hyracks.storage.am.lsm.common.impls.NoMergePolicyFactory;
-import org.apache.hyracks.util.StorageUtil;
-import org.apache.hyracks.util.StorageUtil.StorageUnit;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.stubbing.Answer;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.spy;
 
 public class CheckpointingTest {
 
@@ -88,6 +94,8 @@ public class CheckpointingTest {
     private static final String DATASET_NAME = "TestDS";
     private static final String DATA_TYPE_NAME = "DUMMY";
     private static final String NODE_GROUP_NAME = "DEFAULT";
+    private volatile boolean threadException = false;
+    private Throwable exception = null;
 
     @Before
     public void setUp() throws Exception {
@@ -128,7 +136,7 @@ public class CheckpointingTest {
                 VSizeFrame frame = new VSizeFrame(ctx);
                 FrameTupleAppender tupleAppender = new FrameTupleAppender(frame);
 
-                IRecoveryManager recoveryManager = nc.getTransactionSubsystem().getRecoveryManager();
+                RecoveryManager recoveryManager = (RecoveryManager) nc.getTransactionSubsystem().getRecoveryManager();
                 ICheckpointManager checkpointManager = nc.getTransactionSubsystem().getCheckpointManager();
                 LogManager logManager = (LogManager) nc.getTransactionSubsystem().getLogManager();
                 // Number of log files after node startup should be one
@@ -178,20 +186,70 @@ public class CheckpointingTest {
 
                 /*
                  * At this point, the low-water mark is not in the initialLowWaterMarkFileId, so
-                 * a checkpoint should delete it.
+                 * a checkpoint should delete it. We will also start a second
+                  * job to ensure that the checkpointing coexists peacefully
+                  * with other concurrent readers of the log that request
+                  * deletions to be witheld
                  */
-                checkpointManager.tryCheckpoint(recoveryManager.getMinFirstLSN());
 
-                // Validate initialLowWaterMarkFileId was deleted
-                for (Long fileId : logManager.getLogFileIds()) {
-                    Assert.assertNotEquals(initialLowWaterMarkFileId, fileId.longValue());
+                JobId jobId2 = nc.newJobId();
+                IHyracksTaskContext ctx2 = nc.createTestContext(jobId2, 0, false);
+                nc.getTransactionManager().beginTransaction(nc.getTxnJobId(ctx2),
+                        new TransactionOptions(ITransactionManager.AtomicityLevel.ENTITY_LEVEL));
+                // Prepare insert operation
+                LSMInsertDeleteOperatorNodePushable insertOp2 = nc.getInsertPipeline(ctx2, dataset, KEY_TYPES,
+                        RECORD_TYPE, META_TYPE, null, KEY_INDEXES, KEY_INDICATOR_LIST, storageManager, null).getLeft();
+                insertOp2.open();
+                VSizeFrame frame2 = new VSizeFrame(ctx2);
+                FrameTupleAppender tupleAppender2 = new FrameTupleAppender(frame2);
+                for (int i = 0; i < 4; i++) {
+                    long lastCkpoint = recoveryManager.getMinFirstLSN();
+                    long lastFileId = logManager.getLogFileId(lastCkpoint);
+
+                    checkpointManager.tryCheckpoint(lowWaterMarkLSN);
+                    // Validate initialLowWaterMarkFileId was deleted
+                    for (Long fileId : logManager.getLogFileIds()) {
+                        Assert.assertNotEquals(initialLowWaterMarkFileId, fileId.longValue());
+                    }
+
+                    while (currentLowWaterMarkLogFileId == lastFileId) {
+                        ITupleReference tuple = tupleGenerator.next();
+                        DataflowUtils.addTupleToFrame(tupleAppender2, tuple, insertOp2);
+                        lowWaterMarkLSN = recoveryManager.getMinFirstLSN();
+                        currentLowWaterMarkLogFileId = logManager.getLogFileId(lowWaterMarkLSN);
+                    }
                 }
+                Thread.UncaughtExceptionHandler h = new Thread.UncaughtExceptionHandler() {
+                    public void uncaughtException(Thread th, Throwable ex) {
+                        threadException = true;
+                        exception = ex;
+                    }
+                };
+
+                Thread t = new Thread(() -> {
+                    TransactionManager spyTxnMgr = spy((TransactionManager) nc.getTransactionManager());
+                    doAnswer((Answer) i -> {
+                        stallAbortTxn(Thread.currentThread(), txnCtx, nc.getTransactionSubsystem(),
+                                (TxnId) i.getArguments()[0]);
+                        return null;
+                    }).when(spyTxnMgr).abortTransaction(any(TxnId.class));
 
-                if (tupleAppender.getTupleCount() > 0) {
-                    tupleAppender.write(insertOp, true);
+                    spyTxnMgr.abortTransaction(txnCtx.getTxnId());
+                });
+                t.setUncaughtExceptionHandler(h);
+                synchronized (t) {
+                    t.start();
+                    t.wait();
+                }
+                long lockedLSN = recoveryManager.getMinFirstLSN();
+                checkpointManager.tryCheckpoint(lockedLSN);
+                synchronized (t) {
+                    t.notifyAll();
+                }
+                t.join();
+                if (threadException) {
+                    throw exception;
                 }
-                insertOp.close();
-                nc.getTransactionManager().commitTransaction(txnCtx.getTxnId());
             } finally {
                 nc.deInit();
             }
@@ -201,6 +259,32 @@ public class CheckpointingTest {
         }
     }
 
+    private void stallAbortTxn(Thread t, ITransactionContext txnCtx, ITransactionSubsystem txnSubsystem, TxnId txnId)
+            throws InterruptedException, HyracksDataException {
+
+        try {
+            if (txnCtx.isWriteTxn()) {
+                LogRecord logRecord = new LogRecord();
+                TransactionUtil.formJobTerminateLogRecord(txnCtx, logRecord, false);
+                txnSubsystem.getLogManager().log(logRecord);
+                txnSubsystem.getCheckpointManager().secure(txnId);
+                synchronized (t) {
+                    t.notifyAll();
+                    t.wait();
+                }
+                txnSubsystem.getRecoveryManager().rollbackTransaction(txnCtx);
+                txnCtx.setTxnState(ITransactionManager.ABORTED);
+            }
+        } catch (ACIDException | HyracksDataException e) {
+            String msg = "Could not complete rollback! System is in an inconsistent state";
+            throw new ACIDException(msg, e);
+        } finally {
+            txnCtx.complete();
+            txnSubsystem.getLockManager().releaseLocks(txnCtx);
+            txnSubsystem.getCheckpointManager().completed(txnId);
+        }
+    }
+
     @Test
     public void testCorruptedCheckpointFiles() {
         try {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6587f62/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ICheckpointManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ICheckpointManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ICheckpointManager.java
index 9e7eb0d..e3cf8b8 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ICheckpointManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ICheckpointManager.java
@@ -46,4 +46,19 @@ public interface ICheckpointManager extends ILifeCycleComponent {
      * @throws HyracksDataException
      */
     long tryCheckpoint(long checkpointTargetLSN) throws HyracksDataException;
-}
\ No newline at end of file
+
+    /**
+     * Secures the current low-water mark until the transaction identified by {@code id} completes.
+     *
+     * @param id
+     * @throws HyracksDataException
+     */
+    void secure(TxnId id) throws HyracksDataException;
+
+    /**
+     * Notifies this {@link ICheckpointManager} that the transaction identified by {@code id} completed.
+     *
+     * @param id
+     */
+    void completed(TxnId id);
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6587f62/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
index 3ada608..736de07 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
@@ -76,7 +76,6 @@ public class LogManager implements ILogManager, ILifeCycleComponent {
     private final String logFilePrefix;
     private final MutableLong flushLSN;
     private final String nodeId;
-    private final HashMap<Long, Integer> txnLogFileId2ReaderCount = new HashMap<>();
     private final long logFileSize;
     private final int logPageSize;
     private final AtomicLong appendLSN;
@@ -407,24 +406,20 @@ public class LogManager implements ILogManager, ILifeCycleComponent {
             /**
              * At this point, any future LogReader should read from LSN >= checkpointLSN
              */
-            synchronized (txnLogFileId2ReaderCount) {
-                for (Long id : logFileIds) {
-                    /**
-                     * Stop deletion if:
-                     * The log file which contains the checkpointLSN has been reached.
-                     * The oldest log file being accessed by a LogReader has been reached.
-                     */
-                    if (id >= checkpointLSNLogFileID
-                            || (txnLogFileId2ReaderCount.containsKey(id) && txnLogFileId2ReaderCount.get(id) > 0)) {
-                        break;
-                    }
-                    //delete old log file
-                    File file = new File(getLogFilePath(id));
-                    file.delete();
-                    txnLogFileId2ReaderCount.remove(id);
-                    if (LOGGER.isInfoEnabled()) {
-                        LOGGER.info("Deleted log file " + file.getAbsolutePath());
-                    }
+            for (Long id : logFileIds) {
+                /**
+                 * Stop deletion if:
+                 * The log file which contains the checkpointLSN has been reached.
+                 * The oldest log file being accessed by a LogReader has been reached.
+                 */
+                if (id >= checkpointLSNLogFileID) {
+                    break;
+                }
+                //delete old log file
+                File file = new File(getLogFilePath(id));
+                file.delete();
+                if (LOGGER.isInfoEnabled()) {
+                    LOGGER.info("Deleted log file " + file.getAbsolutePath());
                 }
             }
         }
@@ -450,7 +445,6 @@ public class LogManager implements ILogManager, ILifeCycleComponent {
     }
 
     private long deleteAllLogFiles() {
-        txnLogFileId2ReaderCount.clear();
         List<Long> logFileIds = getLogFileIds();
         if (!logFileIds.isEmpty()) {
             for (Long id : logFileIds) {
@@ -607,7 +601,6 @@ public class LogManager implements ILogManager, ILifeCycleComponent {
         RandomAccessFile raf = new RandomAccessFile(new File(logFilePath), "r");
         FileChannel newFileChannel = raf.getChannel();
         TxnLogFile logFile = new TxnLogFile(this, newFileChannel, fileId, fileId * logFileSize);
-        touchLogFile(fileId);
         return logFile;
     }
 
@@ -617,32 +610,6 @@ public class LogManager implements ILogManager, ILifeCycleComponent {
             LOGGER.warn(() -> "Closing log file with id(" + logFileRef.getLogFileId() + ") with a closed channel.");
         }
         fileChannel.close();
-        untouchLogFile(logFileRef.getLogFileId());
-    }
-
-    private void touchLogFile(long fileId) {
-        synchronized (txnLogFileId2ReaderCount) {
-            if (txnLogFileId2ReaderCount.containsKey(fileId)) {
-                txnLogFileId2ReaderCount.put(fileId, txnLogFileId2ReaderCount.get(fileId) + 1);
-            } else {
-                txnLogFileId2ReaderCount.put(fileId, 1);
-            }
-        }
-    }
-
-    private void untouchLogFile(long fileId) {
-        synchronized (txnLogFileId2ReaderCount) {
-            if (txnLogFileId2ReaderCount.containsKey(fileId)) {
-                int newReaderCount = txnLogFileId2ReaderCount.get(fileId) - 1;
-                if (newReaderCount < 0) {
-                    throw new IllegalStateException(
-                            "Invalid log file reader count (ID=" + fileId + ", count: " + newReaderCount + ")");
-                }
-                txnLogFileId2ReaderCount.put(fileId, newReaderCount);
-            } else {
-                throw new IllegalStateException("Trying to close log file id(" + fileId + ") which was not opened.");
-            }
-        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6587f62/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointManager.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointManager.java
index a541bd9..6efd0e5 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointManager.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointManager.java
@@ -22,10 +22,15 @@ import org.apache.asterix.common.api.IDatasetLifecycleManager;
 import org.apache.asterix.common.transactions.CheckpointProperties;
 import org.apache.asterix.common.transactions.ICheckpointManager;
 import org.apache.asterix.common.transactions.ITransactionSubsystem;
+import org.apache.asterix.common.transactions.TxnId;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
 /**
  * An implementation of {@link ICheckpointManager} that defines the logic
  * of checkpoints.
@@ -33,9 +38,12 @@ import org.apache.logging.log4j.Logger;
 public class CheckpointManager extends AbstractCheckpointManager {
 
     private static final Logger LOGGER = LogManager.getLogger();
+    private static final long NO_SECURED_LSN = -1l;
+    private final Map<TxnId, Long> securedLSNs;
 
     public CheckpointManager(ITransactionSubsystem txnSubsystem, CheckpointProperties checkpointProperties) {
         super(txnSubsystem, checkpointProperties);
+        securedLSNs = new HashMap<>();
     }
 
     /**
@@ -62,6 +70,10 @@ public class CheckpointManager extends AbstractCheckpointManager {
     @Override
     public synchronized long tryCheckpoint(long checkpointTargetLSN) throws HyracksDataException {
         LOGGER.info("Attemping soft checkpoint...");
+        final long minSecuredLSN = getMinSecuredLSN();
+        if (minSecuredLSN != NO_SECURED_LSN && checkpointTargetLSN >= minSecuredLSN) {
+            return minSecuredLSN;
+        }
         final long minFirstLSN = txnSubsystem.getRecoveryManager().getMinFirstLSN();
         boolean checkpointSucceeded = minFirstLSN >= checkpointTargetLSN;
         if (!checkpointSucceeded) {
@@ -77,4 +89,18 @@ public class CheckpointManager extends AbstractCheckpointManager {
         }
         return minFirstLSN;
     }
-}
\ No newline at end of file
+
+    @Override
+    public synchronized void secure(TxnId id) throws HyracksDataException {
+        securedLSNs.put(id, txnSubsystem.getRecoveryManager().getMinFirstLSN());
+    }
+
+    @Override
+    public synchronized void completed(TxnId id) {
+        securedLSNs.remove(id);
+    }
+
+    private synchronized long getMinSecuredLSN() {
+        return securedLSNs.isEmpty() ? NO_SECURED_LSN : Collections.min(securedLSNs.values());
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6587f62/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionManager.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionManager.java
index 76ecc63..c218dec 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionManager.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionManager.java
@@ -33,6 +33,7 @@ import org.apache.asterix.common.transactions.LogRecord;
 import org.apache.asterix.common.transactions.TransactionOptions;
 import org.apache.asterix.common.transactions.TxnId;
 import org.apache.asterix.common.utils.TransactionUtil;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.lifecycle.ILifeCycleComponent;
 import org.apache.hyracks.util.annotations.ThreadSafe;
 import org.apache.logging.log4j.Level;
@@ -103,10 +104,11 @@ public class TransactionManager implements ITransactionManager, ILifeCycleCompon
                 LogRecord logRecord = new LogRecord();
                 TransactionUtil.formJobTerminateLogRecord(txnCtx, logRecord, false);
                 txnSubsystem.getLogManager().log(logRecord);
+                txnSubsystem.getCheckpointManager().secure(txnId);
                 txnSubsystem.getRecoveryManager().rollbackTransaction(txnCtx);
                 txnCtx.setTxnState(ITransactionManager.ABORTED);
             }
-        } catch (ACIDException e) {
+        } catch (HyracksDataException e) {
             String msg = "Could not complete rollback! System is in an inconsistent state";
             if (LOGGER.isErrorEnabled()) {
                 LOGGER.log(Level.ERROR, msg, e);
@@ -116,6 +118,7 @@ public class TransactionManager implements ITransactionManager, ILifeCycleCompon
             txnCtx.complete();
             txnSubsystem.getLockManager().releaseLocks(txnCtx);
             txnCtxRepository.remove(txnCtx.getTxnId());
+            txnSubsystem.getCheckpointManager().completed(txnId);
         }
     }