You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by mh...@apache.org on 2018/03/12 10:42:08 UTC

asterixdb git commit: [NO ISSUE][TX] Ensure Uncommitted Atomic Txns Not Flushed

Repository: asterixdb
Updated Branches:
  refs/heads/master 94a542461 -> c5ca3db26


[NO ISSUE][TX] Ensure Uncommitted Atomic Txns Not Flushed

- user model changes: no
- storage format changes: no
- interface changes: yes

Details:
- Make all metadata indexes modifications as
  force modifications.
- Do not decrement ops of atomic transactions
  until they fully commit or abort to prevent
  flushing partial records.
- Do not schedule flush if a force modification
  starts before the flush log is written to disk.
- Unify code path for completing operations
  after commit/abort in op tracker.
- Remove unneeded update log commit notification.
- Add test case for failing flush due to force
  modification.

Change-Id: If8d5df630f1d9119002ef91da5c282da18901acc
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2456
Sonar-Qube: Jenkins <je...@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <je...@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <je...@fulliautomatix.ics.uci.edu>
Reviewed-by: abdullah alamoudi <ba...@gmail.com>
Integration-Tests: Jenkins <je...@fulliautomatix.ics.uci.edu>


Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/c5ca3db2
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/c5ca3db2
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/c5ca3db2

Branch: refs/heads/master
Commit: c5ca3db262e98ba17001bf295588b3862af2fc2b
Parents: 94a5424
Author: Murtadha Hubail <mh...@apache.org>
Authored: Mon Mar 12 01:21:50 2018 +0300
Committer: Murtadha Hubail <mh...@apache.org>
Committed: Mon Mar 12 01:01:17 2018 -0700

----------------------------------------------------------------------
 .../asterix/test/metadata/MetadataTxnTest.java  | 70 ++++++++++++++++++++
 .../context/PrimaryIndexOperationTracker.java   | 53 +++++++++------
 .../transactions/ITransactionContext.java       | 12 +---
 .../apache/asterix/metadata/MetadataNode.java   |  8 +--
 .../management/service/logging/LogBuffer.java   |  4 --
 .../transaction/AbstractTransactionContext.java |  6 +-
 .../transaction/AtomicTransactionContext.java   | 36 +++++-----
 .../EntityLevelTransactionContext.java          | 27 ++++----
 .../org/apache/hyracks/api/util/InvokeUtil.java | 18 +++++
 9 files changed, 161 insertions(+), 73 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/c5ca3db2/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/metadata/MetadataTxnTest.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/metadata/MetadataTxnTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/metadata/MetadataTxnTest.java
index 70e5f6e..0f6adf6 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/metadata/MetadataTxnTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/metadata/MetadataTxnTest.java
@@ -19,8 +19,10 @@
 package org.apache.asterix.test.metadata;
 
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
@@ -31,19 +33,25 @@ import org.apache.asterix.common.api.IDatasetLifecycleManager;
 import org.apache.asterix.common.api.INcApplicationContext;
 import org.apache.asterix.common.config.DatasetConfig;
 import org.apache.asterix.common.config.GlobalConfig;
+import org.apache.asterix.common.context.DatasetInfo;
 import org.apache.asterix.common.context.PrimaryIndexOperationTracker;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.metadata.MetadataManager;
 import org.apache.asterix.metadata.MetadataTransactionContext;
+import org.apache.asterix.metadata.api.IMetadataIndex;
 import org.apache.asterix.metadata.bootstrap.MetadataBuiltinEntities;
+import org.apache.asterix.metadata.bootstrap.MetadataPrimaryIndexes;
 import org.apache.asterix.metadata.declared.MetadataProvider;
 import org.apache.asterix.metadata.entities.Dataset;
 import org.apache.asterix.metadata.entities.NodeGroup;
 import org.apache.asterix.metadata.utils.DatasetUtil;
 import org.apache.asterix.test.common.TestExecutor;
 import org.apache.asterix.testframework.context.TestCaseContext;
+import org.apache.hyracks.api.util.InvokeUtil;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
+import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex;
 import org.apache.hyracks.storage.am.lsm.common.impls.NoMergePolicyFactory;
+import org.apache.hyracks.test.support.TestUtils;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -245,6 +253,68 @@ public class MetadataTxnTest {
         }
     }
 
+    @Test
+    public void failedFlushOnUncommittedMetadataTxn() throws Exception {
+        ICcApplicationContext ccAppCtx =
+                (ICcApplicationContext) integrationUtil.getClusterControllerService().getApplicationContext();
+        final MetadataProvider metadataProvider = new MetadataProvider(ccAppCtx, null);
+        final MetadataTransactionContext mdTxn = MetadataManager.INSTANCE.beginTransaction();
+        metadataProvider.setMetadataTxnContext(mdTxn);
+        final String nodeGroupName = "ng";
+        try {
+            final List<String> ngNodes = Collections.singletonList("asterix_nc1");
+            MetadataManager.INSTANCE.addNodegroup(mdTxn, new NodeGroup(nodeGroupName, ngNodes));
+            MetadataManager.INSTANCE.commitTransaction(mdTxn);
+        } finally {
+            metadataProvider.getLocks().unlock();
+        }
+        INcApplicationContext appCtx = (INcApplicationContext) integrationUtil.ncs[0].getApplicationContext();
+        IDatasetLifecycleManager dlcm = appCtx.getDatasetLifecycleManager();
+        dlcm.flushAllDatasets();
+        IMetadataIndex idx = MetadataPrimaryIndexes.NODEGROUP_DATASET;
+        DatasetInfo datasetInfo = dlcm.getDatasetInfo(idx.getDatasetId().getId());
+        AbstractLSMIndex index = (AbstractLSMIndex) appCtx.getDatasetLifecycleManager()
+                .getIndex(idx.getDatasetId().getId(), idx.getResourceId());
+        PrimaryIndexOperationTracker opTracker = (PrimaryIndexOperationTracker) index.getOperationTracker();
+        final MetadataTransactionContext mdTxn2 = MetadataManager.INSTANCE.beginTransaction();
+        int mutableComponentBeforeFlush = index.getCurrentMemoryComponentIndex();
+        int diskComponentsBeforeFlush = index.getDiskComponents().size();
+        // lock opTracker to prevent log flusher from triggering flush
+        synchronized (opTracker) {
+            opTracker.setFlushOnExit(true);
+            opTracker.flushIfNeeded();
+            Assert.assertTrue(opTracker.isFlushLogCreated());
+            metadataProvider.setMetadataTxnContext(mdTxn2);
+            // make sure force operation will processed
+            MetadataManager.INSTANCE.dropNodegroup(mdTxn2, nodeGroupName, false);
+            Assert.assertEquals(1, opTracker.getNumActiveOperations());
+            Assert.assertFalse(index.hasFlushRequestForCurrentMutableComponent());
+            // release opTracker lock now to allow log flusher to schedule the flush
+            InvokeUtil.runWithTimeout(() -> {
+                synchronized (opTracker) {
+                    opTracker.wait(1000);
+                }
+            }, () -> !opTracker.isFlushLogCreated(), 10, TimeUnit.SECONDS);
+        }
+        // ensure flush failed to be scheduled
+        datasetInfo.waitForIO();
+        Assert.assertEquals(mutableComponentBeforeFlush, index.getCurrentMemoryComponentIndex());
+        Assert.assertEquals(diskComponentsBeforeFlush, index.getDiskComponents().size());
+        // after committing, the flush should be scheduled successfully
+        opTracker.setFlushOnExit(true);
+        MetadataManager.INSTANCE.commitTransaction(mdTxn2);
+        metadataProvider.getLocks().unlock();
+        InvokeUtil.runWithTimeout(() -> {
+            synchronized (opTracker) {
+                opTracker.wait(1000);
+            }
+        }, () -> !opTracker.isFlushLogCreated(), 10, TimeUnit.SECONDS);
+        // ensure flush completed successfully and the component was switched
+        datasetInfo.waitForIO();
+        Assert.assertNotEquals(mutableComponentBeforeFlush, index.getCurrentMemoryComponentIndex());
+        Assert.assertEquals(diskComponentsBeforeFlush + 1, index.getDiskComponents().size());
+    }
+
     private void addDataset(ICcApplicationContext appCtx, Dataset source, int datasetPostfix, boolean abort)
             throws Exception {
         Dataset dataset = new Dataset(source.getDataverseName(), "ds_" + datasetPostfix, source.getDataverseName(),

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/c5ca3db2/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
index 3886115..47f7ae8 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
@@ -24,6 +24,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback;
+import org.apache.asterix.common.metadata.MetadataIndexImmutableProperties;
 import org.apache.asterix.common.transactions.AbstractOperationCallback;
 import org.apache.asterix.common.transactions.ILogManager;
 import org.apache.asterix.common.transactions.LogRecord;
@@ -95,7 +96,7 @@ public class PrimaryIndexOperationTracker extends BaseOperationTracker {
     }
 
     public synchronized void flushIfNeeded() throws HyracksDataException {
-        if (numActiveOperations.get() == 0) {
+        if (canSafelyFlush()) {
             flushIfRequested();
         }
     }
@@ -117,7 +118,8 @@ public class PrimaryIndexOperationTracker extends BaseOperationTracker {
         }
 
         if (needsFlush || flushOnExit) {
-            //Make the current mutable components READABLE_UNWRITABLE to stop coming modify operations from entering them until the current flush is scheduled.
+            // make the current mutable components READABLE_UNWRITABLE to stop coming modify operations from entering
+            // them until the current flush is scheduled.
             LSMComponentId primaryId = null;
             for (ILSMIndex lsmIndex : indexes) {
                 ILSMOperationTracker opTracker = lsmIndex.getOperationTracker();
@@ -137,7 +139,7 @@ public class PrimaryIndexOperationTracker extends BaseOperationTracker {
             LogRecord logRecord = new LogRecord();
             flushOnExit = false;
             if (dsInfo.isDurable()) {
-                /**
+                /*
                  * Generate a FLUSH log.
                  * Flush will be triggered when the log is written to disk by LogFlusher.
                  */
@@ -158,18 +160,30 @@ public class PrimaryIndexOperationTracker extends BaseOperationTracker {
 
     //This method is called sequentially by LogPage.notifyFlushTerminator in the sequence flushes were scheduled.
     public synchronized void triggerScheduleFlush(LogRecord logRecord) throws HyracksDataException {
-        idGenerator.refresh();
-        for (ILSMIndex lsmIndex : dsInfo.getDatasetPartitionOpenIndexes(partition)) {
-            //get resource
-            ILSMIndexAccessor accessor = lsmIndex.createAccessor(NoOpIndexAccessParameters.INSTANCE);
-            //update resource lsn
-            AbstractLSMIOOperationCallback ioOpCallback =
-                    (AbstractLSMIOOperationCallback) lsmIndex.getIOOperationCallback();
-            ioOpCallback.updateLastLSN(logRecord.getLSN());
-            //schedule flush after update
-            accessor.scheduleFlush(lsmIndex.getIOOperationCallback());
+        try {
+            if (!canSafelyFlush()) {
+                // if a force modification operation started before the flush is scheduled, this flush will fail
+                // and a next attempt will be made when that operation completes. This is only expected for metadata
+                // datasets since they always use force modification
+                if (MetadataIndexImmutableProperties.isMetadataDataset(datasetID)) {
+                    return;
+                }
+                throw new IllegalStateException("Operation started while index was pending scheduling a flush");
+            }
+            idGenerator.refresh();
+            for (ILSMIndex lsmIndex : dsInfo.getDatasetPartitionOpenIndexes(partition)) {
+                //get resource
+                ILSMIndexAccessor accessor = lsmIndex.createAccessor(NoOpIndexAccessParameters.INSTANCE);
+                //update resource lsn
+                AbstractLSMIOOperationCallback ioOpCallback =
+                        (AbstractLSMIOOperationCallback) lsmIndex.getIOOperationCallback();
+                ioOpCallback.updateLastLSN(logRecord.getLSN());
+                //schedule flush after update
+                accessor.scheduleFlush(lsmIndex.getIOOperationCallback());
+            }
+        } finally {
+            flushLogCreated = false;
         }
-        flushLogCreated = false;
     }
 
     public int getNumActiveOperations() {
@@ -194,14 +208,6 @@ public class PrimaryIndexOperationTracker extends BaseOperationTracker {
         }
     }
 
-    public void cleanupNumActiveOperationsForAbortedJob(int numberOfActiveOperations) {
-        numberOfActiveOperations *= -1;
-        numActiveOperations.getAndAdd(numberOfActiveOperations);
-        if (numActiveOperations.get() < 0) {
-            throw new IllegalStateException("The number of active operations cannot be negative!");
-        }
-    }
-
     public boolean isFlushOnExit() {
         return flushOnExit;
     }
@@ -218,4 +224,7 @@ public class PrimaryIndexOperationTracker extends BaseOperationTracker {
         return partition;
     }
 
+    private boolean canSafelyFlush() {
+        return numActiveOperations.get() == 0;
+    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/c5ca3db2/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionContext.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionContext.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionContext.java
index a3d5bc5..940535f 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionContext.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionContext.java
@@ -23,9 +23,8 @@ import org.apache.hyracks.storage.common.IModificationOperationCallback;
 
 /**
  * A typical transaction lifecycle goes through the following steps:
- * 1. {@link ITransactionContext#register(long, ILSMIndex, IModificationOperationCallback, boolean)}
+ * 1. {@link ITransactionContext#register(long, int, ILSMIndex, IModificationOperationCallback, boolean)}
  * 2. {@link ITransactionContext#beforeOperation(long)}
- * 3. {@link ITransactionContext#notifyUpdateCommitted(long)}
  * 4. {@link ITransactionContext#notifyEntityCommitted}
  * 5. {@link ITransactionContext#afterOperation(long)}
  * 6. {@link ITransactionContext#complete()}
@@ -125,15 +124,6 @@ public interface ITransactionContext {
     void beforeOperation(long resourceId);
 
     /**
-     * Called to notify the transaction that an update log belonging
-     * to this transaction on index with {@code resourceId} has been
-     * flushed to disk.
-     *
-     * @param resourceId
-     */
-    void notifyUpdateCommitted(long resourceId);
-
-    /**
      * Called to notify the transaction that an entity commit
      * log belonging to this transaction has been flushed to
      * disk.

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/c5ca3db2/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
index f81d7da..616d92b 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
@@ -490,19 +490,17 @@ public class MetadataNode implements IMetadataNode {
             LSMIndexUtil.checkAndSetFirstLSN((AbstractLSMIndex) lsmIndex, transactionSubsystem.getLogManager());
             switch (op) {
                 case INSERT:
-                    indexAccessor.insert(tuple);
+                    indexAccessor.forceInsert(tuple);
                     break;
                 case DELETE:
-                    indexAccessor.delete(tuple);
+                    indexAccessor.forceDelete(tuple);
                     break;
                 case UPSERT:
-                    indexAccessor.upsert(tuple);
+                    indexAccessor.forceUpsert(tuple);
                     break;
                 default:
                     throw new IllegalStateException("Unknown operation type: " + op);
             }
-            PrimaryIndexOperationTracker opTracker = (PrimaryIndexOperationTracker) lsmIndex.getOperationTracker();
-            opTracker.flushIfNeeded(); // there is a window where the flush is not triggerred after an operation
         } finally {
             datasetLifecycleManager.close(resourceName);
         }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/c5ca3db2/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java
index a630caa..21268e5 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java
@@ -226,10 +226,6 @@ public class LogBuffer implements ILogBuffer {
                         if (txnSubsystem.getTransactionProperties().isCommitProfilerEnabled()) {
                             txnSubsystem.incrementEntityCommitCount();
                         }
-                    } else if (logRecord.getLogType() == LogType.UPDATE) {
-                        reusableTxnId.setId(logRecord.getTxnId());
-                        txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(reusableTxnId);
-                        txnCtx.notifyUpdateCommitted(logRecord.getResourceId());
                     } else if (logRecord.getLogType() == LogType.JOB_COMMIT
                             || logRecord.getLogType() == LogType.ABORT) {
                         notifyJobTermination();

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/c5ca3db2/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AbstractTransactionContext.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AbstractTransactionContext.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AbstractTransactionContext.java
index b3d5e49..95cabf9 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AbstractTransactionContext.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AbstractTransactionContext.java
@@ -107,8 +107,8 @@ public abstract class AbstractTransactionContext implements ITransactionContext
     @Override
     public void complete() {
         try {
-            if (txnState.get() == ITransactionManager.ABORTED) {
-                cleanupForAbort();
+            if (isWriteTxn()) {
+                cleanup();
             }
         } finally {
             synchronized (txnOpTrackers) {
@@ -141,5 +141,5 @@ public abstract class AbstractTransactionContext implements ITransactionContext
         return sb.toString();
     }
 
-    protected abstract void cleanupForAbort();
+    protected abstract void cleanup();
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/c5ca3db2/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AtomicTransactionContext.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AtomicTransactionContext.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AtomicTransactionContext.java
index 219cf07..079e99a 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AtomicTransactionContext.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AtomicTransactionContext.java
@@ -22,8 +22,8 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import org.apache.asterix.common.context.PrimaryIndexOperationTracker;
 import org.apache.asterix.common.exceptions.ACIDException;
+import org.apache.asterix.common.transactions.ITransactionManager;
 import org.apache.asterix.common.transactions.TxnId;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
@@ -57,16 +57,6 @@ public class AtomicTransactionContext extends AbstractTransactionContext {
     }
 
     @Override
-    public void notifyUpdateCommitted(long resourceId) {
-        try {
-            opTrackers.get(resourceId).completeOperation(null, LSMOperationType.MODIFICATION, null,
-                    callbacks.get(resourceId));
-        } catch (HyracksDataException e) {
-            throw new ACIDException(e);
-        }
-    }
-
-    @Override
     public void notifyEntityCommitted(int partition) {
         throw new IllegalStateException("Unexpected entity commit in atomic transaction");
     }
@@ -82,10 +72,26 @@ public class AtomicTransactionContext extends AbstractTransactionContext {
     }
 
     @Override
-    public void cleanupForAbort() {
-        // each opTracker should be cleaned
-        opTrackers.forEach((resId, opTracker) -> ((PrimaryIndexOperationTracker) opTracker)
-                .cleanupNumActiveOperationsForAbortedJob(indexPendingOps.get(resId).get()));
+    public void cleanup() {
+        switch (getTxnState()) {
+            case ITransactionManager.ABORTED:
+            case ITransactionManager.COMMITTED:
+                for (Map.Entry<Long, ILSMOperationTracker> opTracker : opTrackers.entrySet()) {
+                    try {
+                        final long resId = opTracker.getKey();
+                        final int idxPendingOps = indexPendingOps.get(resId).intValue();
+                        for (int i = 0; i < idxPendingOps; i++) {
+                            opTracker.getValue().completeOperation(null, LSMOperationType.FORCE_MODIFICATION, null,
+                                    callbacks.get(resId));
+                        }
+                    } catch (HyracksDataException e) {
+                        throw new ACIDException(e);
+                    }
+                }
+                break;
+            default:
+                throw new IllegalStateException("invalid state in txn clean up: " + getTxnState());
+        }
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/c5ca3db2/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/EntityLevelTransactionContext.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/EntityLevelTransactionContext.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/EntityLevelTransactionContext.java
index 9d2f54b..9fcb08b 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/EntityLevelTransactionContext.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/EntityLevelTransactionContext.java
@@ -20,11 +20,11 @@ package org.apache.asterix.transaction.management.service.transaction;
 
 import java.util.HashMap;
 import java.util.Map;
-import java.util.Map.Entry;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.asterix.common.context.PrimaryIndexOperationTracker;
 import org.apache.asterix.common.exceptions.ACIDException;
+import org.apache.asterix.common.transactions.ITransactionManager;
 import org.apache.asterix.common.transactions.TxnId;
 import org.apache.hyracks.algebricks.common.utils.Pair;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -56,8 +56,7 @@ public class EntityLevelTransactionContext extends AbstractTransactionContext {
             resourcePendingOps.put(resourceId, pendingOps);
             if (primaryIndex) {
                 Pair<PrimaryIndexOperationTracker, IModificationOperationCallback> pair =
-                        new Pair<PrimaryIndexOperationTracker, IModificationOperationCallback>(
-                                (PrimaryIndexOperationTracker) index.getOperationTracker(), callback);
+                        new Pair<>((PrimaryIndexOperationTracker) index.getOperationTracker(), callback);
                 primaryIndexTrackers.put(partition, pair);
             }
         }
@@ -69,11 +68,6 @@ public class EntityLevelTransactionContext extends AbstractTransactionContext {
     }
 
     @Override
-    public void notifyUpdateCommitted(long resourceId) {
-        // no op
-    }
-
-    @Override
     public void notifyEntityCommitted(int partition) {
         try {
             Pair<PrimaryIndexOperationTracker, IModificationOperationCallback> pair =
@@ -90,11 +84,18 @@ public class EntityLevelTransactionContext extends AbstractTransactionContext {
     }
 
     @Override
-    protected void cleanupForAbort() {
-        for (Entry<Integer, Pair<PrimaryIndexOperationTracker, IModificationOperationCallback>> e : primaryIndexTrackers
-                .entrySet()) {
-            AtomicInteger pendingOps = partitionPendingOps.get(e.getKey());
-            e.getValue().first.cleanupNumActiveOperationsForAbortedJob(pendingOps.get());
+    protected void cleanup() {
+        if (getTxnState() == ITransactionManager.ABORTED) {
+            primaryIndexTrackers.forEach((partitionId, opTracker) -> {
+                int pendingOps = partitionPendingOps.get(partitionId).intValue();
+                for (int i = 0; i < pendingOps; i++) {
+                    try {
+                        opTracker.first.completeOperation(null, LSMOperationType.MODIFICATION, null, opTracker.second);
+                    } catch (HyracksDataException ex) {
+                        throw new ACIDException(ex);
+                    }
+                }
+            });
         }
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/c5ca3db2/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java
index c60e673..fb2bdeb 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java
@@ -22,6 +22,8 @@ import java.io.IOException;
 import java.nio.channels.ClosedByInterruptException;
 import java.util.concurrent.Callable;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.function.BooleanSupplier;
 
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.util.IOInterruptibleAction;
@@ -234,4 +236,20 @@ public class InvokeUtil {
             }
         }
     }
+
+    /**
+     * Runs the supplied {@code action} until {@code stopCondition} is met or timeout.
+     */
+    public static void runWithTimeout(ThrowingAction action, BooleanSupplier stopCondition, long timeout, TimeUnit unit)
+            throws Exception {
+        long remainingTime = unit.toNanos(timeout);
+        final long startTime = System.nanoTime();
+        while (!stopCondition.getAsBoolean()) {
+            if (remainingTime <= 0) {
+                throw new TimeoutException("Stop condition was not met after " + unit.toSeconds(timeout) + " seconds.");
+            }
+            action.run();
+            remainingTime -= System.nanoTime() - startTime;
+        }
+    }
 }