You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by mh...@apache.org on 2017/11/16 23:27:11 UTC

[2/2] asterixdb git commit: [NO ISSUE][TX] Introduce Atomic Transactions

[NO ISSUE][TX] Introduce Atomic Transactions

- user model changes: no
- storage format changes: no
- interface changes: yes
  - Redesign and simplify ITransactionManager API
  - Redesign and simplify ITransactionContext API

Details:
- Introduce atomic transactions. Unlike entity level transaction,
  atomic transaction do not generate any entity commit logs and
  may modify multiple primary indexes. Therefore, either all the
  operations of an atomic transaction will be committed or nothing.
  Atomic transactions  are used by metadata transactions, while other
  transactions still use entity level transactions.
- Add index resource id to AbstractOperationCallback.
- Refactor metadata index modification code.
- Remove unused class MutableResourceId
- Remove unused class FieldsHashValueGenerator
- Add test case for concurrent metadata transactions.

Change-Id: I13db1c15f8afbdaae608ff0a7468fe62bf1daccd
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2156
Sonar-Qube: Jenkins <je...@fulliautomatix.ics.uci.edu>
Reviewed-by: Michael Blow <mb...@apache.org>
Integration-Tests: Jenkins <je...@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <je...@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <je...@fulliautomatix.ics.uci.edu>


Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/5070d633
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/5070d633
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/5070d633

Branch: refs/heads/master
Commit: 5070d633eaee536c20706e59891a44a6257d8bd8
Parents: 893d385
Author: Murtadha Hubail <mh...@apache.org>
Authored: Thu Nov 16 22:30:35 2017 +0300
Committer: Murtadha Hubail <mh...@apache.org>
Committed: Thu Nov 16 15:26:25 2017 -0800

----------------------------------------------------------------------
 .../test/dataflow/ComponentRollbackTest.java    |  30 +-
 .../asterix/test/dataflow/LogMarkerTest.java    |   9 +-
 .../asterix/test/logging/CheckpointingTest.java |   9 +-
 .../asterix/test/metadata/MetadataTxnTest.java  |  95 +++++++
 .../asterix/test/storage/DiskIsFullTest.java    |   2 -
 .../context/PrimaryIndexOperationTracker.java   |   4 +-
 .../common/exceptions/ACIDException.java        |   2 +-
 .../transactions/AbstractOperationCallback.java |  17 +-
 .../transactions/ITransactionContext.java       | 131 +++++++--
 .../transactions/ITransactionManager.java       | 102 +++----
 .../common/transactions/TransactionOptions.java |  35 +++
 .../apache/asterix/metadata/MetadataNode.java   | 177 ++++--------
 .../job/listener/JobEventListenerFactory.java   |  16 +-
 ...tiTransactionJobletEventListenerFactory.java |  15 +-
 .../std/FlushDatasetOperatorDescriptor.java     |   2 +-
 ...tractIndexModificationOperationCallback.java |   4 +-
 .../LockThenSearchOperationCallback.java        |   4 +-
 .../LockThenSearchOperationCallbackFactory.java |   6 +-
 ...maryIndexInstantSearchOperationCallback.java |   4 +-
 ...exInstantSearchOperationCallbackFactory.java |  14 +-
 ...dexModificationOperationCallbackFactory.java |   5 +-
 .../PrimaryIndexSearchOperationCallback.java    |   6 +-
 ...maryIndexSearchOperationCallbackFactory.java |   4 +-
 ...dexModificationOperationCallbackFactory.java |   5 +-
 .../SecondaryIndexSearchOperationCallback.java  |   4 +-
 ...daryIndexSearchOperationCallbackFactory.java |   2 +-
 ...dexModificationOperationCallbackFactory.java |   5 +-
 ...dexModificationOperationCallbackFactory.java |   5 +-
 .../UpsertOperationCallbackFactory.java         |   5 +-
 .../management/runtime/CommitRuntime.java       |   6 +-
 .../management/service/logging/LogBuffer.java   |  14 +-
 .../transaction/AbstractTransactionContext.java | 145 ++++++++++
 .../transaction/AtomicTransactionContext.java   | 107 ++++++++
 .../EntityLevelTransactionContext.java          | 103 +++++++
 .../transaction/FieldsHashValueGenerator.java   |  41 ---
 .../service/transaction/MutableResourceId.java  |  48 ----
 .../service/transaction/TransactionContext.java | 271 -------------------
 .../transaction/TransactionContextFactory.java  |  43 +++
 .../service/transaction/TransactionManager.java | 168 +++++-------
 39 files changed, 895 insertions(+), 770 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5070d633/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java
index 00b185d..b09e000 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java
@@ -39,8 +39,9 @@ import org.apache.asterix.common.api.IDatasetLifecycleManager;
 import org.apache.asterix.common.config.DatasetConfig.DatasetType;
 import org.apache.asterix.common.dataflow.LSMInsertDeleteOperatorNodePushable;
 import org.apache.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback;
-import org.apache.asterix.common.transactions.DatasetId;
 import org.apache.asterix.common.transactions.ITransactionContext;
+import org.apache.asterix.common.transactions.ITransactionManager;
+import org.apache.asterix.common.transactions.TransactionOptions;
 import org.apache.asterix.external.util.DataflowUtils;
 import org.apache.asterix.file.StorageComponentProvider;
 import org.apache.asterix.metadata.entities.Dataset;
@@ -146,7 +147,8 @@ public class ComponentRollbackTest {
         lsmBtree = (TestLsmBtree) indexDataflowHelper.getIndexInstance();
         indexDataflowHelper.close();
         nc.newJobId();
-        txnCtx = nc.getTransactionManager().getTransactionContext(nc.getTxnJobId(ctx), true);
+        txnCtx = nc.getTransactionManager().beginTransaction(nc.getTxnJobId(ctx),
+                new TransactionOptions(ITransactionManager.AtomicityLevel.ENTITY_LEVEL));
         insertOp = nc.getInsertPipeline(ctx, dataset, KEY_TYPES, RECORD_TYPE, META_TYPE, null, KEY_INDEXES,
                 KEY_INDICATORS_LIST, storageManager).getLeft();
     }
@@ -188,7 +190,7 @@ public class ComponentRollbackTest {
                 tupleAppender.write(insertOp, true);
             }
             insertOp.close();
-            nc.getTransactionManager().completedTransaction(txnCtx, DatasetId.NULL, -1, true);
+            nc.getTransactionManager().commitTransaction(txnCtx.getTxnId());
 
             // get all components
             List<ILSMMemoryComponent> memComponents = lsmBtree.getMemoryComponents();
@@ -239,8 +241,7 @@ public class ComponentRollbackTest {
                 tupleAppender.write(insertOp, true);
             }
             insertOp.close();
-            nc.getTransactionManager().completedTransaction(txnCtx, DatasetId.NULL, -1, true);
-
+            nc.getTransactionManager().commitTransaction(txnCtx.getTxnId());
             // get all components
             List<ILSMMemoryComponent> memComponents = lsmBtree.getMemoryComponents();
             List<ILSMDiskComponent> diskComponents = lsmBtree.getDiskComponents();
@@ -255,7 +256,8 @@ public class ComponentRollbackTest {
 
             // insert again
             nc.newJobId();
-            txnCtx = nc.getTransactionManager().getTransactionContext(nc.getTxnJobId(ctx), true);
+            txnCtx = nc.getTransactionManager().beginTransaction(nc.getTxnJobId(ctx),
+                    new TransactionOptions(ITransactionManager.AtomicityLevel.ENTITY_LEVEL));
             insertOp = nc.getInsertPipeline(ctx, dataset, KEY_TYPES, RECORD_TYPE, META_TYPE, null, KEY_INDEXES,
                     KEY_INDICATORS_LIST, storageManager).getLeft();
             insertOp.open();
@@ -267,7 +269,7 @@ public class ComponentRollbackTest {
                 tupleAppender.write(insertOp, true);
             }
             insertOp.close();
-            nc.getTransactionManager().completedTransaction(txnCtx, DatasetId.NULL, -1, true);
+            nc.getTransactionManager().commitTransaction(txnCtx.getTxnId());
             searchAndAssertCount(nc, ctx, dataset, storageManager, TOTAL_NUM_OF_RECORDS);
             // rollback the last disk component
             lsmAccessor = lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE);
@@ -308,7 +310,7 @@ public class ComponentRollbackTest {
                 tupleAppender.write(insertOp, true);
             }
             insertOp.close();
-            nc.getTransactionManager().completedTransaction(txnCtx, DatasetId.NULL, -1, true);
+            nc.getTransactionManager().commitTransaction(txnCtx.getTxnId());
 
             // get all components
             List<ILSMMemoryComponent> memComponents = lsmBtree.getMemoryComponents();
@@ -378,7 +380,7 @@ public class ComponentRollbackTest {
                 tupleAppender.write(insertOp, true);
             }
             insertOp.close();
-            nc.getTransactionManager().completedTransaction(txnCtx, DatasetId.NULL, -1, true);
+            nc.getTransactionManager().commitTransaction(txnCtx.getTxnId());
             // get all components
             List<ILSMMemoryComponent> memComponents = lsmBtree.getMemoryComponents();
             List<ILSMDiskComponent> diskComponents = lsmBtree.getDiskComponents();
@@ -434,7 +436,7 @@ public class ComponentRollbackTest {
                 tupleAppender.write(insertOp, true);
             }
             insertOp.close();
-            nc.getTransactionManager().completedTransaction(txnCtx, DatasetId.NULL, -1, true);
+            nc.getTransactionManager().commitTransaction(txnCtx.getTxnId());
             // get all components
             List<ILSMMemoryComponent> memComponents = lsmBtree.getMemoryComponents();
             List<ILSMDiskComponent> diskComponents = lsmBtree.getDiskComponents();
@@ -498,7 +500,7 @@ public class ComponentRollbackTest {
                 tupleAppender.write(insertOp, true);
             }
             insertOp.close();
-            nc.getTransactionManager().completedTransaction(txnCtx, DatasetId.NULL, -1, true);
+            nc.getTransactionManager().commitTransaction(txnCtx.getTxnId());
             // get all components
             List<ILSMMemoryComponent> memComponents = lsmBtree.getMemoryComponents();
             List<ILSMDiskComponent> diskComponents = lsmBtree.getDiskComponents();
@@ -558,7 +560,7 @@ public class ComponentRollbackTest {
                 tupleAppender.write(insertOp, true);
             }
             insertOp.close();
-            nc.getTransactionManager().completedTransaction(txnCtx, DatasetId.NULL, -1, true);
+            nc.getTransactionManager().commitTransaction(txnCtx.getTxnId());
             // get all components
             List<ILSMMemoryComponent> memComponents = lsmBtree.getMemoryComponents();
             List<ILSMDiskComponent> diskComponents = lsmBtree.getDiskComponents();
@@ -620,7 +622,7 @@ public class ComponentRollbackTest {
                 tupleAppender.write(insertOp, true);
             }
             insertOp.close();
-            nc.getTransactionManager().completedTransaction(txnCtx, DatasetId.NULL, -1, true);
+            nc.getTransactionManager().commitTransaction(txnCtx.getTxnId());
             // get all components
             List<ILSMMemoryComponent> memComponents = lsmBtree.getMemoryComponents();
             List<ILSMDiskComponent> diskComponents = lsmBtree.getDiskComponents();
@@ -691,7 +693,7 @@ public class ComponentRollbackTest {
                 tupleAppender.write(insertOp, true);
             }
             insertOp.close();
-            nc.getTransactionManager().completedTransaction(txnCtx, DatasetId.NULL, -1, true);
+            nc.getTransactionManager().commitTransaction(txnCtx.getTxnId());
             // get all components
             List<ILSMMemoryComponent> memComponents = lsmBtree.getMemoryComponents();
             List<ILSMDiskComponent> diskComponents = lsmBtree.getDiskComponents();

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5070d633/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LogMarkerTest.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LogMarkerTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LogMarkerTest.java
index 82eb16a..1907ed6 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LogMarkerTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LogMarkerTest.java
@@ -31,9 +31,10 @@ import org.apache.asterix.app.data.gen.TupleGenerator;
 import org.apache.asterix.app.data.gen.TupleGenerator.GenerationFunction;
 import org.apache.asterix.common.config.DatasetConfig.DatasetType;
 import org.apache.asterix.common.dataflow.LSMInsertDeleteOperatorNodePushable;
-import org.apache.asterix.common.transactions.DatasetId;
 import org.apache.asterix.common.transactions.ILogRecord;
 import org.apache.asterix.common.transactions.ITransactionContext;
+import org.apache.asterix.common.transactions.ITransactionManager;
+import org.apache.asterix.common.transactions.TransactionOptions;
 import org.apache.asterix.external.util.DataflowUtils;
 import org.apache.asterix.file.StorageComponentProvider;
 import org.apache.asterix.metadata.entities.Dataset;
@@ -119,8 +120,8 @@ public class LogMarkerTest {
                         storageManager, KEY_INDEXES, KEY_INDICATORS_LIST);
                 IHyracksTaskContext ctx = nc.createTestContext(true);
                 nc.newJobId();
-                ITransactionContext txnCtx =
-                        nc.getTransactionManager().getTransactionContext(nc.getTxnJobId(ctx), true);
+                ITransactionContext txnCtx = nc.getTransactionManager().beginTransaction(nc.getTxnJobId(ctx),
+                        new TransactionOptions(ITransactionManager.AtomicityLevel.ENTITY_LEVEL));
                 LSMInsertDeleteOperatorNodePushable insertOp = nc.getInsertPipeline(ctx, dataset, KEY_TYPES,
                         RECORD_TYPE, META_TYPE, null, KEY_INDEXES, KEY_INDICATORS_LIST, storageManager).getLeft();
                 insertOp.open();
@@ -147,7 +148,7 @@ public class LogMarkerTest {
                     tupleAppender.write(insertOp, true);
                 }
                 insertOp.close();
-                nc.getTransactionManager().completedTransaction(txnCtx, DatasetId.NULL, -1, true);
+                nc.getTransactionManager().commitTransaction(txnCtx.getTxnId());
                 IndexDataflowHelperFactory iHelperFactory =
                         new IndexDataflowHelperFactory(nc.getStorageManager(), indexInfo.getFileSplitProvider());
                 IIndexDataflowHelper dataflowHelper =

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5070d633/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java
index 5384c92..1cc24d5 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java
@@ -34,11 +34,12 @@ import org.apache.asterix.common.configuration.AsterixConfiguration;
 import org.apache.asterix.common.configuration.Property;
 import org.apache.asterix.common.dataflow.LSMInsertDeleteOperatorNodePushable;
 import org.apache.asterix.common.transactions.Checkpoint;
-import org.apache.asterix.common.transactions.DatasetId;
 import org.apache.asterix.common.transactions.ICheckpointManager;
 import org.apache.asterix.common.transactions.IRecoveryManager;
 import org.apache.asterix.common.transactions.ITransactionContext;
+import org.apache.asterix.common.transactions.ITransactionManager;
 import org.apache.asterix.common.transactions.ITransactionSubsystem;
+import org.apache.asterix.common.transactions.TransactionOptions;
 import org.apache.asterix.external.util.DataflowUtils;
 import org.apache.asterix.file.StorageComponentProvider;
 import org.apache.asterix.metadata.entities.Dataset;
@@ -128,8 +129,8 @@ public class CheckpointingTest {
                         KEY_INDICATOR_LIST);
                 IHyracksTaskContext ctx = nc.createTestContext(false);
                 nc.newJobId();
-                ITransactionContext txnCtx =
-                        nc.getTransactionManager().getTransactionContext(nc.getTxnJobId(ctx), true);
+                ITransactionContext txnCtx = nc.getTransactionManager().beginTransaction(nc.getTxnJobId(ctx),
+                        new TransactionOptions(ITransactionManager.AtomicityLevel.ENTITY_LEVEL));
                 // Prepare insert operation
                 LSMInsertDeleteOperatorNodePushable insertOp = nc.getInsertPipeline(ctx, dataset, KEY_TYPES,
                         RECORD_TYPE, META_TYPE, null, KEY_INDEXES, KEY_INDICATOR_LIST, storageManager).getLeft();
@@ -202,7 +203,7 @@ public class CheckpointingTest {
                     tupleAppender.write(insertOp, true);
                 }
                 insertOp.close();
-                nc.getTransactionManager().completedTransaction(txnCtx, DatasetId.NULL, -1, true);
+                nc.getTransactionManager().commitTransaction(txnCtx.getTxnId());
             } finally {
                 nc.deInit();
             }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5070d633/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/metadata/MetadataTxnTest.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/metadata/MetadataTxnTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/metadata/MetadataTxnTest.java
index 3969ec5..3e906b4 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/metadata/MetadataTxnTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/metadata/MetadataTxnTest.java
@@ -21,11 +21,20 @@ package org.apache.asterix.test.metadata;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.logging.ConsoleHandler;
+import java.util.logging.Level;
+import java.util.logging.Logger;
 import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 import java.util.stream.Stream;
 
 import org.apache.asterix.api.common.AsterixHyracksIntegrationUtil;
+import org.apache.asterix.common.api.IDatasetLifecycleManager;
+import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.config.DatasetConfig;
 import org.apache.asterix.common.config.GlobalConfig;
+import org.apache.asterix.common.context.PrimaryIndexOperationTracker;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.metadata.MetadataManager;
 import org.apache.asterix.metadata.MetadataTransactionContext;
@@ -36,7 +45,9 @@ import org.apache.asterix.metadata.entities.NodeGroup;
 import org.apache.asterix.metadata.utils.DatasetUtil;
 import org.apache.asterix.test.common.TestExecutor;
 import org.apache.asterix.testframework.context.TestCaseContext;
+import org.apache.hyracks.storage.am.lsm.common.impls.NoMergePolicyFactory;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -137,4 +148,88 @@ public class MetadataTxnTest {
             MetadataManager.INSTANCE.commitTransaction(readMdTxn);
         }
     }
+
+    @Test
+    public void concurrentMetadataTxn() throws Exception {
+        // get create type and dataset
+        String datasetName = "dataset1";
+        final TestCaseContext.OutputFormat format = TestCaseContext.OutputFormat.CLEAN_JSON;
+        testExecutor.executeSqlppUpdateOrDdl("CREATE TYPE KeyType AS { id: int };", format);
+        testExecutor.executeSqlppUpdateOrDdl("CREATE DATASET " + datasetName + "(KeyType) PRIMARY KEY id;", format);
+
+        // get created dataset
+        ICcApplicationContext appCtx =
+                (ICcApplicationContext) integrationUtil.getClusterControllerService().getApplicationContext();
+        MetadataProvider metadataProvider = new MetadataProvider(appCtx, null);
+        final MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+        metadataProvider.setMetadataTxnContext(mdTxnCtx);
+        Dataset sourceDataset;
+        try {
+            sourceDataset = metadataProvider.findDataset(MetadataBuiltinEntities.DEFAULT_DATAVERSE_NAME, datasetName);
+            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+        } finally {
+            metadataProvider.getLocks().unlock();
+        }
+
+        /*
+         * Concurrently insert copies of the created dataset with
+         * different names and either commit or abort the transaction.
+         */
+        final AtomicInteger failCount = new AtomicInteger(0);
+        Thread transactor1 = new Thread(() -> IntStream.range(1, 100).forEach(x -> {
+            try {
+                addDataset(appCtx, sourceDataset, x, x % 2 == 0);
+            } catch (Exception e) {
+                e.printStackTrace();
+                failCount.incrementAndGet();
+            }
+        }));
+
+        Thread transactor2 = new Thread(() -> IntStream.range(101, 200).forEach(x -> {
+            try {
+                addDataset(appCtx, sourceDataset, x, x % 3 == 0);
+            } catch (Exception e) {
+                e.printStackTrace();
+                failCount.incrementAndGet();
+            }
+        }));
+
+        transactor1.start();
+        transactor2.start();
+        transactor1.join();
+        transactor2.join();
+
+        Assert.assertEquals(0, failCount.get());
+
+        // make sure all metadata indexes have no pending operations after all txns committed/aborted
+        final IDatasetLifecycleManager datasetLifecycleManager =
+                ((INcApplicationContext) integrationUtil.ncs[0].getApplicationContext()).getDatasetLifecycleManager();
+        int maxMetadatasetId = 14;
+        for (int i = 1; i <= maxMetadatasetId; i++) {
+            if (datasetLifecycleManager.getIndex(i, i) != null) {
+                final PrimaryIndexOperationTracker opTracker = datasetLifecycleManager.getOperationTracker(i);
+                Assert.assertEquals(0, opTracker.getNumActiveOperations());
+            }
+        }
+    }
+
+    private void addDataset(ICcApplicationContext appCtx, Dataset source, int datasetPostfix, boolean abort)
+            throws Exception {
+        Dataset dataset = new Dataset(source.getDataverseName(), "ds_" + datasetPostfix, source.getDataverseName(),
+                source.getDatasetType().name(), source.getNodeGroupName(), NoMergePolicyFactory.NAME, null,
+                source.getDatasetDetails(), source.getHints(), DatasetConfig.DatasetType.INTERNAL, datasetPostfix, 0);
+        MetadataProvider metadataProvider = new MetadataProvider(appCtx, null);
+        final MetadataTransactionContext writeTxn = MetadataManager.INSTANCE.beginTransaction();
+        metadataProvider.setMetadataTxnContext(writeTxn);
+        try {
+            MetadataManager.INSTANCE.addDataset(writeTxn, dataset);
+            if (abort) {
+                MetadataManager.INSTANCE.abortTransaction(writeTxn);
+            } else {
+                MetadataManager.INSTANCE.commitTransaction(writeTxn);
+            }
+        } finally {
+            metadataProvider.getLocks().unlock();
+        }
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5070d633/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/DiskIsFullTest.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/DiskIsFullTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/DiskIsFullTest.java
index eb47248..cb83d56 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/DiskIsFullTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/DiskIsFullTest.java
@@ -128,8 +128,6 @@ public class DiskIsFullTest {
                         KEY_INDICATOR_LIST);
                 IHyracksTaskContext ctx = nc.createTestContext(false);
                 nc.newJobId();
-                ITransactionContext txnCtx =
-                        nc.getTransactionManager().getTransactionContext(nc.getTxnJobId(ctx), true);
                 // Prepare insert operation
                 LSMInsertDeleteOperatorNodePushable insertOp = nc.getInsertPipeline(ctx, dataset, KEY_TYPES,
                         RECORD_TYPE, META_TYPE, null, KEY_INDEXES, KEY_INDICATOR_LIST, storageManager).getLeft();

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5070d633/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
index 6f35a3d..ababe9c 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
@@ -174,7 +174,7 @@ public class PrimaryIndexOperationTracker extends BaseOperationTracker {
         //modificationCallback can be NoOpOperationCallback when redo/undo operations are executed.
         if (modificationCallback != NoOpOperationCallback.INSTANCE) {
             numActiveOperations.incrementAndGet();
-            ((AbstractOperationCallback) modificationCallback).incrementLocalNumActiveOperations();
+            ((AbstractOperationCallback) modificationCallback).beforeOperation();
         }
     }
 
@@ -182,7 +182,7 @@ public class PrimaryIndexOperationTracker extends BaseOperationTracker {
         //modificationCallback can be NoOpOperationCallback when redo/undo operations are executed.
         if (modificationCallback != NoOpOperationCallback.INSTANCE) {
             numActiveOperations.decrementAndGet();
-            ((AbstractOperationCallback) modificationCallback).decrementLocalNumActiveOperations();
+            ((AbstractOperationCallback) modificationCallback).afterOperation();
         }
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5070d633/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ACIDException.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ACIDException.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ACIDException.java
index 7b489f2..77634eb 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ACIDException.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ACIDException.java
@@ -27,7 +27,7 @@ import org.apache.asterix.common.transactions.ITransactionContext;
  * ACIDException encountered during crash recovery shall not have a transaction
  * context as recovery does not happen as part of a transaction.
  */
-public class ACIDException extends Exception {
+public class ACIDException extends RuntimeException {
 
     private static final long serialVersionUID = -8855848112541877323L;
     private ITransactionContext txnContext;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5070d633/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/AbstractOperationCallback.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/AbstractOperationCallback.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/AbstractOperationCallback.java
index 9844344..098bbd4 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/AbstractOperationCallback.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/AbstractOperationCallback.java
@@ -31,14 +31,16 @@ public abstract class AbstractOperationCallback {
     protected final ITransactionContext txnCtx;
     protected final ILockManager lockManager;
     protected final long[] longHashes;
+    protected final long resourceId;
 
-    public AbstractOperationCallback(DatasetId datasetId, int[] primaryKeyFields, ITransactionContext txnCtx,
-            ILockManager lockManager) {
+    public AbstractOperationCallback(DatasetId datasetId, long resourceId, int[] primaryKeyFields,
+            ITransactionContext txnCtx, ILockManager lockManager) {
         this.datasetId = datasetId;
+        this.resourceId = resourceId;
         this.primaryKeyFields = primaryKeyFields;
         this.txnCtx = txnCtx;
         this.lockManager = lockManager;
-        this.longHashes = new long[2];
+        longHashes = new long[2];
     }
 
     public int computePrimaryKeyHashValue(ITupleReference tuple, int[] primaryKeyFields) {
@@ -46,12 +48,11 @@ public abstract class AbstractOperationCallback {
         return Math.abs((int) longHashes[0]);
     }
 
-    public void incrementLocalNumActiveOperations() {
-        txnCtx.incrementNumActiveOperations();
+    public void beforeOperation() {
+        txnCtx.beforeOperation(resourceId);
     }
 
-    public void decrementLocalNumActiveOperations() {
-        txnCtx.decrementNumActiveOperations();
+    public void afterOperation() {
+        txnCtx.afterOperation(resourceId);
     }
-
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5070d633/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionContext.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionContext.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionContext.java
index 3dda5d3..c4a2d03 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionContext.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionContext.java
@@ -19,43 +19,132 @@
 package org.apache.asterix.common.transactions;
 
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
-
+import org.apache.hyracks.storage.common.IModificationOperationCallback;
+
+/**
+ * A typical transaction lifecycle goes through the following steps:
+ * 1. {@link ITransactionContext#register(long, ILSMIndex, IModificationOperationCallback, boolean)}
+ * 2. {@link ITransactionContext#beforeOperation(long)}
+ * 3. {@link ITransactionContext#notifyUpdateCommitted(long)}
+ * 4. {@link ITransactionContext#notifyEntityCommitted}
+ * 5. {@link ITransactionContext#afterOperation(long)}
+ * 6. {@link ITransactionContext#complete()}
+ */
 public interface ITransactionContext {
 
-    public void registerIndexAndCallback(long resourceId, ILSMIndex index, AbstractOperationCallback callback,
-            boolean isPrimaryIndex);
-
-    public TxnId getTxnId();
-
-    public void setTimeout(boolean isTimeout);
+    /**
+     * Registers {@link ILSMIndex} in the transaction. Registering an index
+     * must be done before any operation is performed on the index by this
+     * transaction.
+     *
+     * @param resourceId
+     * @param index
+     * @param callback
+     * @param primaryIndex
+     */
+    void register(long resourceId, ILSMIndex index, IModificationOperationCallback callback, boolean primaryIndex);
 
-    public boolean isTimeout();
+    /**
+     * Gets the unique transaction id.
+     *
+     * @return the unique transaction id
+     */
+    TxnId getTxnId();
 
-    public void setTxnState(int txnState);
+    /**
+     * Sets a flag indicating that the transaction timed out.
+     *
+     * @param isTimeout
+     */
+    void setTimeout(boolean isTimeout);
 
-    public int getTxnState();
+    /**
+     * Tests if the transaction was timed out.
+     *
+     * @return true if this transaction timed out. Otherwise false.
+     */
+    boolean isTimeout();
 
-    public long getFirstLSN();
+    /**
+     * Sets the state if this transaction.
+     *
+     * @param txnState
+     */
+    void setTxnState(int txnState);
 
-    public long getLastLSN();
+    /**
+     * Gets the current state of this transaction.
+     *
+     * @return the current state of this transaction
+     */
+    int getTxnState();
 
-    public void setLastLSN(long LSN);
+    /**
+     * Gets the first log sequence number of this transaction.
+     *
+     * @return the first log sequence number
+     */
+    long getFirstLSN();
 
-    public boolean isWriteTxn();
+    /**
+     * Gets the last log sequence number of this transactions.
+     *
+     * @return the last log sequence number
+     */
+    long getLastLSN();
 
-    public void setWriteTxn(boolean isWriterTxn);
+    /**
+     * Sets the last log sequence number of this transactions.
+     *
+     * @param newValue
+     */
+    void setLastLSN(long newValue);
 
-    public String prettyPrint();
+    /**
+     * Tests if this is a write transaction.
+     *
+     * @return true if this is a write transaction, otherwise false.
+     */
+    boolean isWriteTxn();
 
-    public void setMetadataTransaction(boolean isMetadataTxn);
+    /**
+     * Sets a flag indication that this is a write transaction.
+     *
+     * @param isWriterTxn
+     */
+    void setWriteTxn(boolean isWriterTxn);
 
-    public boolean isMetadataTransaction();
+    /**
+     * Called before an operation is performed on index
+     * with resource id {@code resourceId}.
+     *
+     * @param resourceId
+     */
+    void beforeOperation(long resourceId);
 
-    public void notifyOptracker(boolean isJobLevelCommit);
+    /**
+     * Called to notify the transaction that an update log belonging
+     * to this transaction on index with {@code resourceId} has been
+     * flushed to disk.
+     *
+     * @param resourceId
+     */
+    void notifyUpdateCommitted(long resourceId);
 
-    public void incrementNumActiveOperations();
+    /**
+     * Called to notify the transaction that an entity commit
+     * log belonging to this transaction has been flushed to
+     * disk.
+     */
+    void notifyEntityCommitted();
 
-    public void decrementNumActiveOperations();
+    /**
+     * Called after an operation is performed on index
+     * with resource id {@code resourceId}.
+     *
+     * @param resourceId
+     */
+    void afterOperation(long resourceId);
 
     /**
      * Called when no further operations will be performed by the transaction

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5070d633/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionManager.java
index 77c6a9f..9603ce3 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionManager.java
@@ -24,105 +24,77 @@ import org.apache.asterix.common.exceptions.ACIDException;
  * Provides APIs for managing life cycle of a transaction, that is beginning a
  * transaction and aborting/committing the transaction.
  */
-
 public interface ITransactionManager {
 
     /**
      * A transaction may be in any of the following states ACTIVE: The
-     * transaction is ongoing and has not yet committed/aborted. COMMITTD: The
+     * transaction is ongoing and has not yet committed/aborted. COMMITTED: The
      * transaction has committed. ABORTED: The transaction has aborted.
      * TIMED_OUT: The transaction has timed out waiting to acquire a lock.
      */
-    public static final int ACTIVE = 0;
-    public static final int COMMITTED = 1;
-    public static final int ABORTED = 2;
-    public static final int TIMED_OUT = 3;
+    int ACTIVE = 0;
+    int COMMITTED = 1;
+    int ABORTED = 2;
+    int TIMED_OUT = 3;
+
+    enum AtomicityLevel {
+        /**
+         * all records are committed or nothing
+         */
+        ATOMIC,
+        /**
+         * any record with entity commit log
+         */
+        ENTITY_LEVEL
+    }
+
+    enum TransactionMode {
+        /**
+         * Transaction performs only read operations
+         */
+        READ,
+        /**
+         * Transaction may perform read and write operations
+         */
+        READ_WRITE
+    }
 
     /**
      * Begins a transaction identified by a transaction id and returns the
      * associated transaction context.
      *
      * @param txnId
-     *            a unique value for the transaction id.
-     * @return the transaction context associated with the initiated transaction
-     * @see ITransactionContext
+     * @param options
+     * @return The transaction context
      * @throws ACIDException
      */
-    public ITransactionContext beginTransaction(TxnId txnId) throws ACIDException;
+    ITransactionContext beginTransaction(TxnId txnId, TransactionOptions options) throws ACIDException;
 
     /**
      * Returns the transaction context of an active transaction given the
      * transaction id.
      *
      * @param txnId
-     *            a unique value for the transaction id.
-     * @param createIfNotExist
-     *            TODO
-     * @return
+     * @return The transaction context
      * @throws ACIDException
      */
-    public ITransactionContext getTransactionContext(TxnId txnId, boolean createIfNotExist) throws ACIDException;
+    ITransactionContext getTransactionContext(TxnId txnId) throws ACIDException;
 
     /**
-     * Commits a transaction.
+     * Commit a transactions
      *
-     * @param txnContext
-     *            the transaction context associated with the transaction
-     * @param datasetId
-     *            TODO
-     * @param pkHash
-     *            TODO
+     * @param txnId
      * @throws ACIDException
-     * @see ITransactionContextimport org.apache.hyracks.api.job.TxnId;
-     * @see ACIDException
      */
-    public void commitTransaction(ITransactionContext txnContext, DatasetId datasetId, int pkHash)
-            throws ACIDException;
+    void commitTransaction(TxnId txnId) throws ACIDException;
 
     /**
      * Aborts a transaction.
      *
-     * @param txnContext
-     *            the transaction context associated with the transaction
-     * @param datasetId
-     *            TODO
-     * @param pkHash
-     *            TODO
-     * @throws ACIDException
-     * @see ITransactionContext
-     * @see ACIDException
-     */
-    public void abortTransaction(ITransactionContext txnContext, DatasetId datasetId, int pkHash)
-            throws ACIDException;
-
-    /**
-     * Indicates end of all activity for a transaction. In other words, all
-     * participating threads in the transaction have completed the intended
-     * task.
-     *
-     * @param txnContext
-     *            the transaction context associated with the transaction
-     * @param datasetId
-     *            TODO
-     * @param pkHash
-     *            TODO
-     * @param success
-     *            indicates the success or failure. The transaction is committed
-     *            or aborted accordingly.
+     * @param txnId
      * @throws ACIDException
      */
-    public void completedTransaction(ITransactionContext txnContext, DatasetId datasetId, int pkHash,
-            boolean success) throws ACIDException;
-
-    /**
-     * Returns the Transaction Provider for the transaction eco-system. A
-     * transaction eco-system consists of a Log Manager, a Recovery Manager, a
-     * Transaction Manager and a Lock Manager.
-     *
-     * @see ITransactionSubsystem
-     * @return TransactionProvider
-     */
-    public ITransactionSubsystem getTransactionSubsystem();
+    void abortTransaction(TxnId txnId) throws ACIDException;
 
     /**
      * @return The current max txn id.

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5070d633/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/TransactionOptions.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/TransactionOptions.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/TransactionOptions.java
new file mode 100644
index 0000000..48dc452
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/TransactionOptions.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.transactions;
+
+import static org.apache.asterix.common.transactions.ITransactionManager.AtomicityLevel;
+
+public class TransactionOptions {
+
+    private final AtomicityLevel atomicityLevel;
+
+    // TODO add TransactionMode(READ/WRITE) to options
+    public TransactionOptions(AtomicityLevel atomicityLevel) {
+        this.atomicityLevel = atomicityLevel;
+    }
+
+    public AtomicityLevel getAtomicityLevel() {
+        return atomicityLevel;
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5070d633/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
index 87a5272..368fc2a 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
@@ -19,13 +19,14 @@
 
 package org.apache.asterix.metadata;
 
+import static org.apache.asterix.common.transactions.ITransactionManager.AtomicityLevel;
+
 import java.rmi.RemoteException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.logging.Level;
 import java.util.logging.Logger;
 
 import org.apache.asterix.common.api.IDatasetLifecycleManager;
@@ -36,12 +37,12 @@ import org.apache.asterix.common.dataflow.LSMIndexUtil;
 import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.functions.FunctionSignature;
 import org.apache.asterix.common.metadata.MetadataIndexImmutableProperties;
-import org.apache.asterix.common.transactions.AbstractOperationCallback;
 import org.apache.asterix.common.transactions.DatasetId;
 import org.apache.asterix.common.transactions.IRecoveryManager.ResourceType;
 import org.apache.asterix.common.transactions.ITransactionContext;
 import org.apache.asterix.common.transactions.ITransactionSubsystem;
 import org.apache.asterix.common.transactions.ImmutableDatasetId;
+import org.apache.asterix.common.transactions.TransactionOptions;
 import org.apache.asterix.common.transactions.TxnId;
 import org.apache.asterix.external.indexing.ExternalFile;
 import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider;
@@ -99,7 +100,6 @@ import org.apache.asterix.transaction.management.opcallbacks.AbstractIndexModifi
 import org.apache.asterix.transaction.management.opcallbacks.SecondaryIndexModificationOperationCallback;
 import org.apache.asterix.transaction.management.opcallbacks.UpsertOperationCallback;
 import org.apache.asterix.transaction.management.service.transaction.DatasetIdFactory;
-import org.apache.asterix.transaction.management.service.transaction.TransactionContext;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
 import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
@@ -117,7 +117,6 @@ import org.apache.hyracks.storage.am.common.impls.NoOpIndexAccessParameters;
 import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
-import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
 import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex;
 import org.apache.hyracks.storage.common.IIndex;
 import org.apache.hyracks.storage.common.IIndexAccessParameters;
@@ -128,7 +127,6 @@ import org.apache.hyracks.storage.common.MultiComparator;
 
 public class MetadataNode implements IMetadataNode {
     private static final long serialVersionUID = 1L;
-    private static final Logger LOGGER = Logger.getLogger(MetadataNode.class.getName());
     private static final DatasetId METADATA_DATASET_ID =
             new ImmutableDatasetId(MetadataPrimaryIndexes.PROPERTIES_METADATA.getDatasetId());
 
@@ -165,37 +163,29 @@ public class MetadataNode implements IMetadataNode {
 
     @Override
     public void beginTransaction(TxnId transactionId) throws ACIDException, RemoteException {
-        ITransactionContext txnCtx = transactionSubsystem.getTransactionManager().beginTransaction(transactionId);
-        txnCtx.setMetadataTransaction(true);
+        TransactionOptions options = new TransactionOptions(AtomicityLevel.ATOMIC);
+        transactionSubsystem.getTransactionManager().beginTransaction(transactionId, options);
     }
 
     @Override
     public void commitTransaction(TxnId txnId) throws RemoteException, ACIDException {
-        ITransactionContext txnCtx = transactionSubsystem.getTransactionManager().getTransactionContext(txnId, false);
-        transactionSubsystem.getTransactionManager().commitTransaction(txnCtx, DatasetId.NULL, -1);
+        transactionSubsystem.getTransactionManager().commitTransaction(txnId);
     }
 
     @Override
     public void abortTransaction(TxnId txnId) throws RemoteException, ACIDException {
-        try {
-            ITransactionContext txnCtx =
-                    transactionSubsystem.getTransactionManager().getTransactionContext(txnId, false);
-            transactionSubsystem.getTransactionManager().abortTransaction(txnCtx, DatasetId.NULL, -1);
-        } catch (ACIDException e) {
-            LOGGER.log(Level.WARNING, "Exception aborting transaction", e);
-            throw e;
-        }
+        transactionSubsystem.getTransactionManager().abortTransaction(txnId);
     }
 
     @Override
     public void lock(TxnId txnId, byte lockMode) throws ACIDException, RemoteException {
-        ITransactionContext txnCtx = transactionSubsystem.getTransactionManager().getTransactionContext(txnId, false);
+        ITransactionContext txnCtx = transactionSubsystem.getTransactionManager().getTransactionContext(txnId);
         transactionSubsystem.getLockManager().lock(METADATA_DATASET_ID, -1, lockMode, txnCtx);
     }
 
     @Override
     public void unlock(TxnId txnId, byte lockMode) throws ACIDException, RemoteException {
-        ITransactionContext txnCtx = transactionSubsystem.getTransactionManager().getTransactionContext(txnId, false);
+        ITransactionContext txnCtx = transactionSubsystem.getTransactionManager().getTransactionContext(txnId);
         transactionSubsystem.getLockManager().unlock(METADATA_DATASET_ID, -1, lockMode, txnCtx);
     }
 
@@ -472,96 +462,66 @@ public class MetadataNode implements IMetadataNode {
 
     private void insertTupleIntoIndex(TxnId txnId, IMetadataIndex metadataIndex, ITupleReference tuple)
             throws ACIDException, HyracksDataException {
-        long resourceID = metadataIndex.getResourceId();
-        String resourceName = metadataIndex.getFile().getRelativePath();
-        ILSMIndex lsmIndex = (ILSMIndex) datasetLifecycleManager.get(resourceName);
-        try {
-            datasetLifecycleManager.open(resourceName);
-
-            // prepare a Callback for logging
-            IModificationOperationCallback modCallback =
-                    createIndexModificationCallback(txnId, resourceID, metadataIndex, lsmIndex, Operation.INSERT);
-
-            IIndexAccessParameters iap = new IndexAccessParameters(modCallback, NoOpOperationCallback.INSTANCE);
-            ILSMIndexAccessor indexAccessor = lsmIndex.createAccessor(iap);
-
-            ITransactionContext txnCtx =
-                    transactionSubsystem.getTransactionManager().getTransactionContext(txnId, false);
-            txnCtx.setWriteTxn(true);
-            txnCtx.registerIndexAndCallback(resourceID, lsmIndex, (AbstractOperationCallback) modCallback,
-                    metadataIndex.isPrimaryIndex());
-
-            LSMIndexUtil.checkAndSetFirstLSN((AbstractLSMIndex) lsmIndex, transactionSubsystem.getLogManager());
-
-            // TODO: fix exceptions once new BTree exception model is in hyracks.
-            indexAccessor.forceInsert(tuple);
-            // Manually complete the operation after the insert. This is to decrement the
-            // resource counters within the
-            // index that determine how many tuples are still 'in-flight' within the index.
-            // Normally the log flusher
-            // does this. The only exception is the index registered as the "primary" which
-            // we will let be decremented
-            // by the job commit log event
-            if (!((TransactionContext) txnCtx).getPrimaryIndexOpTracker().equals(lsmIndex.getOperationTracker())) {
-                lsmIndex.getOperationTracker().completeOperation(lsmIndex, LSMOperationType.FORCE_MODIFICATION, null,
-                        modCallback);
-            }
-        } finally {
-            datasetLifecycleManager.close(resourceName);
-        }
+        modifyMetadataIndex(Operation.INSERT, txnId, metadataIndex, tuple);
     }
 
     private void upsertTupleIntoIndex(TxnId txnId, IMetadataIndex metadataIndex, ITupleReference tuple)
             throws ACIDException, HyracksDataException {
-        long resourceId = metadataIndex.getResourceId();
+        modifyMetadataIndex(Operation.UPSERT, txnId, metadataIndex, tuple);
+    }
+
+    private void modifyMetadataIndex(Operation op, TxnId txnId, IMetadataIndex metadataIndex, ITupleReference tuple)
+            throws ACIDException, HyracksDataException {
         String resourceName = metadataIndex.getFile().getRelativePath();
         ILSMIndex lsmIndex = (ILSMIndex) datasetLifecycleManager.get(resourceName);
         datasetLifecycleManager.open(resourceName);
         try {
-            // prepare a Callback for logging
-            ITransactionContext txnCtx =
-                    transactionSubsystem.getTransactionManager().getTransactionContext(txnId, false);
-            IModificationOperationCallback modCallback =
-                    new UpsertOperationCallback(metadataIndex.getDatasetId(), metadataIndex.getPrimaryKeyIndexes(),
-                            txnCtx, transactionSubsystem.getLockManager(), transactionSubsystem, resourceId,
-                            metadataStoragePartition, ResourceType.LSM_BTREE, Operation.UPSERT);
+            ITransactionContext txnCtx = transactionSubsystem.getTransactionManager().getTransactionContext(txnId);
+            IModificationOperationCallback modCallback = createIndexModificationCallback(op, txnCtx, metadataIndex);
             IIndexAccessParameters iap = new IndexAccessParameters(modCallback, NoOpOperationCallback.INSTANCE);
             ILSMIndexAccessor indexAccessor = lsmIndex.createAccessor(iap);
             txnCtx.setWriteTxn(true);
-            txnCtx.registerIndexAndCallback(resourceId, lsmIndex, (AbstractOperationCallback) modCallback,
-                    metadataIndex.isPrimaryIndex());
+            txnCtx.register(metadataIndex.getResourceId(), lsmIndex, modCallback, metadataIndex.isPrimaryIndex());
             LSMIndexUtil.checkAndSetFirstLSN((AbstractLSMIndex) lsmIndex, transactionSubsystem.getLogManager());
-            indexAccessor.forceUpsert(tuple);
-            // Manually complete the operation after the insert. This is to decrement the
-            // resource counters within the
-            // index that determine how many tuples are still 'in-flight' within the index.
-            // Normally the log flusher
-            // does this. The only exception is the index registered as the "primary" which
-            // we will let be decremented
-            // by the job commit log event
-            if (!((TransactionContext) txnCtx).getPrimaryIndexOpTracker().equals(lsmIndex.getOperationTracker())) {
-                lsmIndex.getOperationTracker().completeOperation(lsmIndex, LSMOperationType.FORCE_MODIFICATION, null,
-                        modCallback);
+            switch (op) {
+                case INSERT:
+                    indexAccessor.insert(tuple);
+                    break;
+                case DELETE:
+                    indexAccessor.delete(tuple);
+                    break;
+                case UPSERT:
+                    indexAccessor.upsert(tuple);
+                    break;
+                default:
+                    throw new IllegalStateException("Unknown operation type: " + op);
             }
         } finally {
             datasetLifecycleManager.close(resourceName);
         }
     }
 
-    private IModificationOperationCallback createIndexModificationCallback(TxnId txnId, long resourceId,
-            IMetadataIndex metadataIndex, ILSMIndex lsmIndex, Operation indexOp) throws ACIDException {
-        ITransactionContext txnCtx = transactionSubsystem.getTransactionManager().getTransactionContext(txnId, false);
-
-        // Regardless of the index type (primary or secondary index), secondary index
-        // modification callback is given
-        // This is still correct since metadata index operation doesn't require any lock
-        // from ConcurrentLockMgr and
-        // The difference between primaryIndexModCallback and secondaryIndexModCallback
-        // is that primary index requires
-        // locks and secondary index doesn't.
-        return new SecondaryIndexModificationOperationCallback(metadataIndex.getDatasetId(),
-                metadataIndex.getPrimaryKeyIndexes(), txnCtx, transactionSubsystem.getLockManager(),
-                transactionSubsystem, resourceId, metadataStoragePartition, ResourceType.LSM_BTREE, indexOp);
+    private IModificationOperationCallback createIndexModificationCallback(Operation indexOp,
+            ITransactionContext txnCtx, IMetadataIndex metadataIndex) {
+        switch (indexOp) {
+            case INSERT:
+            case DELETE:
+                /*
+                 * Regardless of the index type (primary or secondary index), secondary index modification
+                 * callback is given. This is still correct since metadata index operation doesn't require
+                 * any lock from ConcurrentLockMgr.
+                 */
+                return new SecondaryIndexModificationOperationCallback(metadataIndex.getDatasetId(),
+                        metadataIndex.getPrimaryKeyIndexes(), txnCtx, transactionSubsystem.getLockManager(),
+                        transactionSubsystem, metadataIndex.getResourceId(), metadataStoragePartition,
+                        ResourceType.LSM_BTREE, indexOp);
+            case UPSERT:
+                return new UpsertOperationCallback(metadataIndex.getDatasetId(), metadataIndex.getPrimaryKeyIndexes(),
+                        txnCtx, transactionSubsystem.getLockManager(), transactionSubsystem,
+                        metadataIndex.getResourceId(), metadataStoragePartition, ResourceType.LSM_BTREE, indexOp);
+            default:
+                throw new IllegalStateException("Unknown operation type: " + indexOp);
+        }
     }
 
     @Override
@@ -822,40 +782,7 @@ public class MetadataNode implements IMetadataNode {
 
     private void deleteTupleFromIndex(TxnId txnId, IMetadataIndex metadataIndex, ITupleReference tuple)
             throws ACIDException, HyracksDataException {
-        long resourceID = metadataIndex.getResourceId();
-        String resourceName = metadataIndex.getFile().getRelativePath();
-        ILSMIndex lsmIndex = (ILSMIndex) datasetLifecycleManager.get(resourceName);
-        try {
-            datasetLifecycleManager.open(resourceName);
-            // prepare a Callback for logging
-            IModificationOperationCallback modCallback =
-                    createIndexModificationCallback(txnId, resourceID, metadataIndex, lsmIndex, Operation.DELETE);
-            IIndexAccessParameters iap = new IndexAccessParameters(modCallback, NoOpOperationCallback.INSTANCE);
-            ILSMIndexAccessor indexAccessor = lsmIndex.createAccessor(iap);
-
-            ITransactionContext txnCtx =
-                    transactionSubsystem.getTransactionManager().getTransactionContext(txnId, false);
-            txnCtx.setWriteTxn(true);
-            txnCtx.registerIndexAndCallback(resourceID, lsmIndex, (AbstractOperationCallback) modCallback,
-                    metadataIndex.isPrimaryIndex());
-
-            LSMIndexUtil.checkAndSetFirstLSN((AbstractLSMIndex) lsmIndex, transactionSubsystem.getLogManager());
-
-            indexAccessor.forceDelete(tuple);
-            // Manually complete the operation after the insert. This is to decrement the
-            // resource counters within the
-            // index that determine how many tuples are still 'in-flight' within the index.
-            // Normally the log flusher
-            // does this. The only exception is the index registered as the "primary" which
-            // we will let be decremented
-            // by the job commit log event
-            if (!((TransactionContext) txnCtx).getPrimaryIndexOpTracker().equals(lsmIndex.getOperationTracker())) {
-                lsmIndex.getOperationTracker().completeOperation(lsmIndex, LSMOperationType.FORCE_MODIFICATION, null,
-                        modCallback);
-            }
-        } finally {
-            datasetLifecycleManager.close(resourceName);
-        }
+        modifyMetadataIndex(Operation.DELETE, txnId, metadataIndex, tuple);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5070d633/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/JobEventListenerFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/JobEventListenerFactory.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/JobEventListenerFactory.java
index 23ad1e1..d3c3fe7 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/JobEventListenerFactory.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/JobEventListenerFactory.java
@@ -18,12 +18,14 @@
  */
 package org.apache.asterix.runtime.job.listener;
 
+import static org.apache.asterix.common.transactions.ITransactionManager.AtomicityLevel;
+
 import org.apache.asterix.common.api.IJobEventListenerFactory;
 import org.apache.asterix.common.api.INcApplicationContext;
 import org.apache.asterix.common.exceptions.ACIDException;
-import org.apache.asterix.common.transactions.DatasetId;
 import org.apache.asterix.common.transactions.ITransactionContext;
 import org.apache.asterix.common.transactions.ITransactionManager;
+import org.apache.asterix.common.transactions.TransactionOptions;
 import org.apache.asterix.common.transactions.TxnId;
 import org.apache.hyracks.api.context.IHyracksJobletContext;
 import org.apache.hyracks.api.job.IJobletEventListener;
@@ -79,10 +81,13 @@ public class JobEventListenerFactory implements IJobEventListenerFactory {
                 try {
                     ITransactionManager txnManager = ((INcApplicationContext) jobletContext.getServiceContext()
                             .getApplicationContext()).getTransactionSubsystem().getTransactionManager();
-                    ITransactionContext txnContext = txnManager.getTransactionContext(txnId, false);
+                    ITransactionContext txnContext = txnManager.getTransactionContext(txnId);
                     txnContext.setWriteTxn(transactionalWrite);
-                    txnManager.completedTransaction(txnContext, DatasetId.NULL, -1,
-                            !(jobStatus == JobStatus.FAILURE));
+                    if (jobStatus != JobStatus.FAILURE) {
+                        txnManager.commitTransaction(txnId);
+                    } else {
+                        txnManager.abortTransaction(txnId);
+                    }
                 } catch (ACIDException e) {
                     throw new Error(e);
                 }
@@ -91,8 +96,9 @@ public class JobEventListenerFactory implements IJobEventListenerFactory {
             @Override
             public void jobletStart() {
                 try {
+                    TransactionOptions options = new TransactionOptions(AtomicityLevel.ENTITY_LEVEL);
                     ((INcApplicationContext) jobletContext.getServiceContext().getApplicationContext())
-                            .getTransactionSubsystem().getTransactionManager().getTransactionContext(txnId, true);
+                            .getTransactionSubsystem().getTransactionManager().beginTransaction(txnId, options);
                 } catch (ACIDException e) {
                     throw new Error(e);
                 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5070d633/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/MultiTransactionJobletEventListenerFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/MultiTransactionJobletEventListenerFactory.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/MultiTransactionJobletEventListenerFactory.java
index 23c86f3..bfe1925 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/MultiTransactionJobletEventListenerFactory.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/MultiTransactionJobletEventListenerFactory.java
@@ -23,9 +23,9 @@ import java.util.List;
 import org.apache.asterix.common.api.IJobEventListenerFactory;
 import org.apache.asterix.common.api.INcApplicationContext;
 import org.apache.asterix.common.exceptions.ACIDException;
-import org.apache.asterix.common.transactions.DatasetId;
 import org.apache.asterix.common.transactions.ITransactionContext;
 import org.apache.asterix.common.transactions.ITransactionManager;
+import org.apache.asterix.common.transactions.TransactionOptions;
 import org.apache.asterix.common.transactions.TxnId;
 import org.apache.hyracks.api.context.IHyracksJobletContext;
 import org.apache.hyracks.api.job.IJobletEventListener;
@@ -75,10 +75,13 @@ public class MultiTransactionJobletEventListenerFactory implements IJobEventList
                             ((INcApplicationContext) jobletContext.getServiceContext().getApplicationContext())
                                     .getTransactionSubsystem().getTransactionManager();
                     for (TxnId txnId : txnIds) {
-                        ITransactionContext txnContext = txnManager.getTransactionContext(txnId, false);
+                        ITransactionContext txnContext = txnManager.getTransactionContext(txnId);
                         txnContext.setWriteTxn(transactionalWrite);
-                        txnManager.completedTransaction(txnContext, DatasetId.NULL, -1,
-                                !(jobStatus == JobStatus.FAILURE));
+                        if (jobStatus != JobStatus.FAILURE) {
+                            txnManager.commitTransaction(txnId);
+                        } else {
+                            txnManager.abortTransaction(txnId);
+                        }
                     }
                 } catch (ACIDException e) {
                     throw new Error(e);
@@ -88,9 +91,11 @@ public class MultiTransactionJobletEventListenerFactory implements IJobEventList
             @Override
             public void jobletStart() {
                 try {
+                    TransactionOptions options =
+                            new TransactionOptions(ITransactionManager.AtomicityLevel.ENTITY_LEVEL);
                     for (TxnId txnId : txnIds) {
                         ((INcApplicationContext) jobletContext.getServiceContext().getApplicationContext())
-                                .getTransactionSubsystem().getTransactionManager().getTransactionContext(txnId, true);
+                                .getTransactionSubsystem().getTransactionManager().beginTransaction(txnId, options);
                     }
                 } catch (ACIDException e) {
                     throw new Error(e);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5070d633/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/std/FlushDatasetOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/std/FlushDatasetOperatorDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/std/FlushDatasetOperatorDescriptor.java
index 6f7287b..d61e9a0 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/std/FlushDatasetOperatorDescriptor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/std/FlushDatasetOperatorDescriptor.java
@@ -78,7 +78,7 @@ public class FlushDatasetOperatorDescriptor extends AbstractSingleActivityOperat
                     ILockManager lockManager = appCtx.getTransactionSubsystem().getLockManager();
                     ITransactionManager txnManager = appCtx.getTransactionSubsystem().getTransactionManager();
                     // get the local transaction
-                    ITransactionContext txnCtx = txnManager.getTransactionContext(txnId, false);
+                    ITransactionContext txnCtx = txnManager.getTransactionContext(txnId);
                     // lock the dataset granule
                     lockManager.lock(datasetId, -1, LockMode.S, txnCtx);
                     // flush the dataset synchronously

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5070d633/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/AbstractIndexModificationOperationCallback.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/AbstractIndexModificationOperationCallback.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/AbstractIndexModificationOperationCallback.java
index 3a2f195..3da9e83 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/AbstractIndexModificationOperationCallback.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/AbstractIndexModificationOperationCallback.java
@@ -68,7 +68,6 @@ public abstract class AbstractIndexModificationOperationCallback extends Abstrac
         }
     }
 
-    protected final long resourceId;
     protected final byte resourceType;
     protected final Operation indexOp;
     protected final ITransactionSubsystem txnSubsystem;
@@ -77,8 +76,7 @@ public abstract class AbstractIndexModificationOperationCallback extends Abstrac
     protected AbstractIndexModificationOperationCallback(DatasetId datasetId, int[] primaryKeyFields,
             ITransactionContext txnCtx, ILockManager lockManager, ITransactionSubsystem txnSubsystem, long resourceId,
             int resourcePartition, byte resourceType, Operation indexOp) {
-        super(datasetId, primaryKeyFields, txnCtx, lockManager);
-        this.resourceId = resourceId;
+        super(datasetId, resourceId, primaryKeyFields, txnCtx, lockManager);
         this.resourceType = resourceType;
         this.indexOp = indexOp;
         this.txnSubsystem = txnSubsystem;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5070d633/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/LockThenSearchOperationCallback.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/LockThenSearchOperationCallback.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/LockThenSearchOperationCallback.java
index c97fb1b..fe17b39 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/LockThenSearchOperationCallback.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/LockThenSearchOperationCallback.java
@@ -45,10 +45,10 @@ public class LockThenSearchOperationCallback extends AbstractOperationCallback i
     private final ILogRecord logRecord;
     private int pkHash;
 
-    public LockThenSearchOperationCallback(DatasetId datasetId, int[] entityIdFields,
+    public LockThenSearchOperationCallback(DatasetId datasetId, long resourceId, int[] entityIdFields,
             ITransactionSubsystem txnSubsystem, ITransactionContext txnCtx,
             IOperatorNodePushable operatorNodePushable) {
-        super(datasetId, entityIdFields, txnCtx, txnSubsystem.getLockManager());
+        super(datasetId, resourceId, entityIdFields, txnCtx, txnSubsystem.getLockManager());
         this.operatorNodePushable = (LSMIndexInsertUpdateDeleteOperatorNodePushable) operatorNodePushable;
         this.logManager = txnSubsystem.getLogManager();
         this.logRecord = new LogRecord();

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5070d633/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/LockThenSearchOperationCallbackFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/LockThenSearchOperationCallbackFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/LockThenSearchOperationCallbackFactory.java
index 3f3dbd9..9f96263 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/LockThenSearchOperationCallbackFactory.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/LockThenSearchOperationCallbackFactory.java
@@ -49,9 +49,9 @@ public class LockThenSearchOperationCallbackFactory extends AbstractOperationCal
         try {
             IJobletEventListenerFactory fact = ctx.getJobletContext().getJobletEventListenerFactory();
             ITransactionContext txnCtx = txnSubsystem.getTransactionManager()
-                    .getTransactionContext(((IJobEventListenerFactory) fact).getTxnId(txnId), false);
-            return new LockThenSearchOperationCallback(new DatasetId(datasetId), primaryKeyFields, txnSubsystem, txnCtx,
-                    operatorNodePushable);
+                    .getTransactionContext(((IJobEventListenerFactory) fact).getTxnId(txnId));
+            return new LockThenSearchOperationCallback(new DatasetId(datasetId), resourceId, primaryKeyFields,
+                    txnSubsystem, txnCtx, operatorNodePushable);
         } catch (ACIDException e) {
             throw new HyracksDataException(e);
         }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5070d633/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexInstantSearchOperationCallback.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexInstantSearchOperationCallback.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexInstantSearchOperationCallback.java
index b13a08e..ec776a5 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexInstantSearchOperationCallback.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexInstantSearchOperationCallback.java
@@ -35,9 +35,9 @@ import org.apache.hyracks.storage.common.ISearchOperationCallback;
 public class PrimaryIndexInstantSearchOperationCallback extends AbstractOperationCallback
         implements ISearchOperationCallback {
 
-    public PrimaryIndexInstantSearchOperationCallback(DatasetId datasetId, int[] entityIdFields,
+    public PrimaryIndexInstantSearchOperationCallback(DatasetId datasetId, long resourceId, int[] entityIdFields,
             ILockManager lockManager, ITransactionContext txnCtx) {
-        super(datasetId, entityIdFields, txnCtx, lockManager);
+        super(datasetId, resourceId, entityIdFields, txnCtx, lockManager);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5070d633/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexInstantSearchOperationCallbackFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexInstantSearchOperationCallbackFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexInstantSearchOperationCallbackFactory.java
index 93108f9..f9c8e3c 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexInstantSearchOperationCallbackFactory.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexInstantSearchOperationCallbackFactory.java
@@ -34,8 +34,8 @@ import org.apache.hyracks.api.job.IJobletEventListenerFactory;
 import org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory;
 import org.apache.hyracks.storage.common.ISearchOperationCallback;
 
-public class PrimaryIndexInstantSearchOperationCallbackFactory extends AbstractOperationCallbackFactory implements
-        ISearchOperationCallbackFactory {
+public class PrimaryIndexInstantSearchOperationCallbackFactory extends AbstractOperationCallbackFactory
+        implements ISearchOperationCallbackFactory {
 
     private static final long serialVersionUID = 1L;
 
@@ -45,15 +45,15 @@ public class PrimaryIndexInstantSearchOperationCallbackFactory extends AbstractO
     }
 
     @Override
-    public ISearchOperationCallback createSearchOperationCallback(long resourceId, IHyracksTaskContext ctx, IOperatorNodePushable operatorNodePushable)
-            throws HyracksDataException {
+    public ISearchOperationCallback createSearchOperationCallback(long resourceId, IHyracksTaskContext ctx,
+            IOperatorNodePushable operatorNodePushable) throws HyracksDataException {
         ITransactionSubsystem txnSubsystem = txnSubsystemProvider.getTransactionSubsystem(ctx);
         try {
             IJobletEventListenerFactory fact = ctx.getJobletContext().getJobletEventListenerFactory();
             ITransactionContext txnCtx = txnSubsystem.getTransactionManager()
-                    .getTransactionContext(((IJobEventListenerFactory) fact).getTxnId(txnId), false);
-            return new PrimaryIndexInstantSearchOperationCallback(new DatasetId(datasetId), primaryKeyFields,
-                    txnSubsystem.getLockManager(), txnCtx);
+                    .getTransactionContext(((IJobEventListenerFactory) fact).getTxnId(txnId));
+            return new PrimaryIndexInstantSearchOperationCallback(new DatasetId(datasetId), resourceId,
+                    primaryKeyFields, txnSubsystem.getLockManager(), txnCtx);
         } catch (ACIDException e) {
             throw new HyracksDataException(e);
         }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5070d633/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java
index fb01952..8f5e386 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java
@@ -23,7 +23,6 @@ import org.apache.asterix.common.api.IJobEventListenerFactory;
 import org.apache.asterix.common.context.ITransactionSubsystemProvider;
 import org.apache.asterix.common.dataflow.DatasetLocalResource;
 import org.apache.asterix.common.exceptions.ACIDException;
-import org.apache.asterix.common.transactions.AbstractOperationCallback;
 import org.apache.asterix.common.transactions.AbstractOperationCallbackFactory;
 import org.apache.asterix.common.transactions.DatasetId;
 import org.apache.asterix.common.transactions.ITransactionContext;
@@ -70,12 +69,12 @@ public class PrimaryIndexModificationOperationCallbackFactory extends AbstractOp
         try {
             IJobletEventListenerFactory fact = ctx.getJobletContext().getJobletEventListenerFactory();
             ITransactionContext txnCtx = txnSubsystem.getTransactionManager()
-                    .getTransactionContext(((IJobEventListenerFactory) fact).getTxnId(txnId), false);
+                    .getTransactionContext(((IJobEventListenerFactory) fact).getTxnId(txnId));
             DatasetLocalResource aResource = (DatasetLocalResource) resource.getResource();
             IModificationOperationCallback modCallback = new PrimaryIndexModificationOperationCallback(
                     new DatasetId(datasetId), primaryKeyFields, txnCtx, txnSubsystem.getLockManager(), txnSubsystem,
                     resource.getId(), aResource.getPartition(), resourceType, indexOp, operatorNodePushable);
-            txnCtx.registerIndexAndCallback(resource.getId(), index, (AbstractOperationCallback) modCallback, true);
+            txnCtx.register(resource.getId(), index, modCallback, true);
             return modCallback;
         } catch (ACIDException e) {
             throw HyracksDataException.create(e);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5070d633/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexSearchOperationCallback.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexSearchOperationCallback.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexSearchOperationCallback.java
index a9075d0..961f799 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexSearchOperationCallback.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexSearchOperationCallback.java
@@ -34,9 +34,9 @@ import org.apache.hyracks.storage.common.ISearchOperationCallback;
  */
 public class PrimaryIndexSearchOperationCallback extends AbstractOperationCallback implements ISearchOperationCallback {
 
-    public PrimaryIndexSearchOperationCallback(DatasetId datasetId, int[] entityIdFields, ILockManager lockManager,
-            ITransactionContext txnCtx) {
-        super(datasetId, entityIdFields, txnCtx, lockManager);
+    public PrimaryIndexSearchOperationCallback(DatasetId datasetId, long resourceId, int[] entityIdFields,
+            ILockManager lockManager, ITransactionContext txnCtx) {
+        super(datasetId, resourceId, entityIdFields, txnCtx, lockManager);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5070d633/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexSearchOperationCallbackFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexSearchOperationCallbackFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexSearchOperationCallbackFactory.java
index 076b0d9..64cbbc9 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexSearchOperationCallbackFactory.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexSearchOperationCallbackFactory.java
@@ -51,8 +51,8 @@ public class PrimaryIndexSearchOperationCallbackFactory extends AbstractOperatio
         try {
             IJobletEventListenerFactory fact = ctx.getJobletContext().getJobletEventListenerFactory();
             ITransactionContext txnCtx = txnSubsystem.getTransactionManager()
-                    .getTransactionContext(((IJobEventListenerFactory) fact).getTxnId(txnId), false);
-            return new PrimaryIndexSearchOperationCallback(new DatasetId(datasetId), primaryKeyFields,
+                    .getTransactionContext(((IJobEventListenerFactory) fact).getTxnId(txnId));
+            return new PrimaryIndexSearchOperationCallback(new DatasetId(datasetId), resourceId, primaryKeyFields,
                     txnSubsystem.getLockManager(), txnCtx);
         } catch (ACIDException e) {
             throw new HyracksDataException(e);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5070d633/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
index 5882046..3fc42c9 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
@@ -23,7 +23,6 @@ import org.apache.asterix.common.api.IJobEventListenerFactory;
 import org.apache.asterix.common.context.ITransactionSubsystemProvider;
 import org.apache.asterix.common.dataflow.DatasetLocalResource;
 import org.apache.asterix.common.exceptions.ACIDException;
-import org.apache.asterix.common.transactions.AbstractOperationCallback;
 import org.apache.asterix.common.transactions.AbstractOperationCallbackFactory;
 import org.apache.asterix.common.transactions.DatasetId;
 import org.apache.asterix.common.transactions.ITransactionContext;
@@ -66,12 +65,12 @@ public class SecondaryIndexModificationOperationCallbackFactory extends Abstract
         try {
             IJobletEventListenerFactory fact = ctx.getJobletContext().getJobletEventListenerFactory();
             ITransactionContext txnCtx = txnSubsystem.getTransactionManager()
-                    .getTransactionContext(((IJobEventListenerFactory) fact).getTxnId(txnId), false);
+                    .getTransactionContext(((IJobEventListenerFactory) fact).getTxnId(txnId));
             DatasetLocalResource aResource = (DatasetLocalResource) resource.getResource();
             IModificationOperationCallback modCallback = new SecondaryIndexModificationOperationCallback(
                     new DatasetId(datasetId), primaryKeyFields, txnCtx, txnSubsystem.getLockManager(), txnSubsystem,
                     resource.getId(), aResource.getPartition(), resourceType, indexOp);
-            txnCtx.registerIndexAndCallback(resource.getId(), index, (AbstractOperationCallback) modCallback, false);
+            txnCtx.register(resource.getId(), index, modCallback, false);
             return modCallback;
         } catch (ACIDException e) {
             throw HyracksDataException.create(e);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5070d633/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexSearchOperationCallback.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexSearchOperationCallback.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexSearchOperationCallback.java
index 108a77e..1b87376 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexSearchOperationCallback.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexSearchOperationCallback.java
@@ -31,8 +31,8 @@ import org.apache.hyracks.storage.common.ISearchOperationCallback;
 public class SecondaryIndexSearchOperationCallback extends AbstractOperationCallback
         implements ISearchOperationCallback {
 
-    public SecondaryIndexSearchOperationCallback() {
-        super(DatasetId.NULL, null, null, null);
+    public SecondaryIndexSearchOperationCallback(long resourceId) {
+        super(DatasetId.NULL, resourceId, null, null, null);
     }
 
     @Override