You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by am...@apache.org on 2017/03/14 18:13:06 UTC

[1/3] asterixdb git commit: Fix transaction logs and optimize upserts

Repository: asterixdb
Updated Branches:
  refs/heads/master 89328a88a -> 04075533a


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/04075533/asterixdb/asterix-transactions/src/test/java/org/apache/asterix/transaction/management/service/locking/Request.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-transactions/src/test/java/org/apache/asterix/transaction/management/service/locking/Request.java b/asterixdb/asterix-transactions/src/test/java/org/apache/asterix/transaction/management/service/locking/Request.java
index 70a803b..fd4dae5 100644
--- a/asterixdb/asterix-transactions/src/test/java/org/apache/asterix/transaction/management/service/locking/Request.java
+++ b/asterixdb/asterix-transactions/src/test/java/org/apache/asterix/transaction/management/service/locking/Request.java
@@ -70,53 +70,63 @@ abstract class Request {
         switch (kind) {
             case INSTANT_TRY_LOCK:
                 return new Request(kind, txnCtx) {
+                    @Override
                     boolean execute(ILockManager lockMgr) throws ACIDException {
                         return lockMgr.instantTryLock(dsId, hashValue, lockMode, txnCtx);
                     }
 
+                    @Override
                     public String toString() {
                         return asString(kind, txnCtx, dsId, hashValue, lockMode);
                     }
                 };
             case INSTANT_LOCK:
                 return new Request(kind, txnCtx) {
+                    @Override
                     boolean execute(ILockManager lockMgr) throws ACIDException {
                         lockMgr.instantLock(dsId, hashValue, lockMode, txnCtx);
                         return true;
                     }
 
+                    @Override
                     public String toString() {
                         return asString(kind, txnCtx, dsId, hashValue, lockMode);
                     }
                 };
             case LOCK:
                 return new Request(kind, txnCtx) {
+                    @Override
                     boolean execute(ILockManager lockMgr) throws ACIDException {
                         lockMgr.lock(dsId, hashValue, lockMode, txnCtx);
                         return true;
                     }
 
+                    @Override
                     public String toString() {
                         return asString(kind, txnCtx, dsId, hashValue, lockMode);
                     }
                 };
             case TRY_LOCK:
                 return new Request(kind, txnCtx) {
+                    @Override
                     boolean execute(ILockManager lockMgr) throws ACIDException {
                         return lockMgr.tryLock(dsId, hashValue, lockMode, txnCtx);
                     }
 
+                    @Override
                     public String toString() {
                         return asString(kind, txnCtx, dsId, hashValue, lockMode);
                     }
                 };
             case UNLOCK:
                 return new Request(kind, txnCtx) {
+                    @Override
                     boolean execute(ILockManager lockMgr) throws ACIDException {
                         lockMgr.unlock(dsId, hashValue, lockMode, txnCtx);
                         return true;
                     }
 
+                    @Override
                     public String toString() {
                         return asString(kind, txnCtx, dsId, hashValue, lockMode);
                     }
@@ -129,11 +139,13 @@ abstract class Request {
     static Request create(final Kind kind, final ITransactionContext txnCtx) {
         if (kind == Kind.RELEASE) {
             return new Request(kind, txnCtx) {
+                @Override
                 boolean execute(ILockManager lockMgr) throws ACIDException {
                     lockMgr.releaseLocks(txnCtx);
                     return true;
                 }
 
+                @Override
                 public String toString() {
                     return txnCtx.getJobId().toString() + ":" + kind.name();
                 }
@@ -145,6 +157,7 @@ abstract class Request {
     static Request create(final Kind kind, final PrintStream out) {
         if (kind == Kind.PRINT) {
             return new Request(kind, null) {
+                @Override
                 boolean execute(ILockManager lockMgr) throws ACIDException {
                     if (out == null) {
                         return false;
@@ -157,6 +170,7 @@ abstract class Request {
                     return true;
                 }
 
+                @Override
                 public String toString() {
                     return kind.name();
                 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/04075533/asterixdb/asterix-yarn/pom.xml
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-yarn/pom.xml b/asterixdb/asterix-yarn/pom.xml
index 66e3a96..81ca3c3 100644
--- a/asterixdb/asterix-yarn/pom.xml
+++ b/asterixdb/asterix-yarn/pom.xml
@@ -136,12 +136,10 @@
                 <include>org.apache.hadoop:hadoop-common</include>
                 <include>org.apache.hadoop:hadoop-hdfs</include>
                 <include>org.apache.hadoop:hadoop-auth</include>
-                <include>org.apache.hadoop:hadoop-yarn-client</include>
                 <include>org.apache.hadoop:hadoop-yarn-common</include>
                 <include>org.apache.hadoop:hadoop-yarn-api</include>
                 <include>org.apache.httpcomponents:httpcore</include>
                 <include>org.apache.httpcomponents:httpclient</include>
-                <include>org.htrace:htrace-core</include>
                 <include>commons-httpclient:commons-httpclient</include>
                 <include>com.google.protobuf:protobuf-java</include>
                 <include>com.google.guava:guava</include>

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/04075533/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputPushRuntime.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputPushRuntime.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputPushRuntime.java
index 89d043c..85db3af 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputPushRuntime.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputPushRuntime.java
@@ -28,5 +28,4 @@ public abstract class AbstractOneInputOneOutputPushRuntime extends AbstractOneIn
     public void setInputRecordDescriptor(int index, RecordDescriptor recordDescriptor) {
         this.inputRecordDesc = recordDescriptor;
     }
-
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/04075533/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IModificationOperationCallback.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IModificationOperationCallback.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IModificationOperationCallback.java
index f22c239..902af7e 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IModificationOperationCallback.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IModificationOperationCallback.java
@@ -25,19 +25,14 @@ import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
  * This operation callback allows for arbitrary actions to be taken while traversing
  * an index structure. The {@link IModificationOperationCallback} will be called on
  * all modifying operations (e.g. insert, update, delete...) for all indexes.
- *
- * @author zheilbron
  */
 public interface IModificationOperationCallback {
-    public enum Operation {
-        INSERT,
-        DELETE
-    }
 
     /**
      * This method is called on a tuple that is about to traverse an index's structure
      * (i.e. before any pages are pinned or latched).
      * The format of the tuple is the format that would be stored in the index itself.
+     *
      * @param tuple
      *            the tuple that is about to be operated on
      */
@@ -49,16 +44,9 @@ public interface IModificationOperationCallback {
      * matching key was found.
      * When found is called, the leaf page where the tuple resides will be pinned and latched,
      * so blocking operations should be avoided.
+     *
      * @param tuple
      *            a tuple with a matching key, otherwise null if none exists
      */
     public void found(ITupleReference before, ITupleReference after) throws HyracksDataException;
-
-    /**
-     * This call specifies the next operation to be performed. It is used to allow
-     * a single operator to perform different operations per tuple
-     * @param op
-     * @throws HyracksDataException
-     */
-    public void setOp(Operation op) throws HyracksDataException;
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/04075533/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/NoOpOperationCallback.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/NoOpOperationCallback.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/NoOpOperationCallback.java
index e8ab8dc..511d166 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/NoOpOperationCallback.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/NoOpOperationCallback.java
@@ -59,9 +59,4 @@ public enum NoOpOperationCallback implements IModificationOperationCallback,ISea
     public void complete(ITupleReference tuple) throws HyracksDataException {
         // Do nothing.
     }
-
-    @Override
-    public void setOp(Operation op) throws HyracksDataException {
-        // Do nothing.
-    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/04075533/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
index b78de8b..3ecb5e0 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
@@ -286,10 +286,10 @@ public class LSMBTree extends AbstractLSMIndex implements ITreeIndex {
             case PHYSICALDELETE:
             case FLUSH:
             case DELETE:
+            case UPSERT:
                 operationalComponents.add(memoryComponents.get(cmc));
                 break;
             case INSERT:
-            case UPSERT:
                 addOperationalMutableComponents(operationalComponents);
                 operationalComponents.addAll(immutableComponents);
                 break;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/04075533/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java
index 90c70aa..b0f366b 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java
@@ -174,6 +174,15 @@ public interface ILSMIndexAccessor extends IIndexAccessor {
     void forceDelete(ITupleReference tuple) throws HyracksDataException, IndexException;
 
     /**
+     * Force upserting the tuple into the memory component even if it is full
+     *
+     * @param tuple
+     * @throws HyracksDataException
+     * @throws IndexException
+     */
+    void forceUpsert(ITupleReference tuple) throws HyracksDataException, IndexException;
+
+    /**
      * Schedule a replication for disk components
      *
      * @param diskComponents

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/04075533/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java
index 0fa69ec..1f93fc5 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java
@@ -161,6 +161,12 @@ public abstract class LSMTreeIndexAccessor implements ILSMIndexAccessor {
     }
 
     @Override
+    public void forceUpsert(ITupleReference tuple) throws HyracksDataException, IndexException {
+        ctx.setOperation(IndexOperation.UPSERT);
+        lsmHarness.forceModify(ctx, tuple);
+    }
+
+    @Override
     public void forceDelete(ITupleReference tuple) throws HyracksDataException, IndexException {
         ctx.setOperation(IndexOperation.DELETE);
         lsmHarness.forceModify(ctx, tuple);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/04075533/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java
index 0a6ffd7..604a57c 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java
@@ -200,4 +200,9 @@ public class LSMInvertedIndexAccessor implements ILSMIndexAccessor, IInvertedInd
         lsmHarness.forceUpdateMeta(ctx, key, value);
     }
 
+    @Override
+    public void forceUpsert(ITupleReference tuple) throws HyracksDataException, IndexException {
+        throw new UnsupportedOperationException("Upsert not supported by lsm inverted index.");
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/04075533/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/storage/am/btree/AbstractModificationOperationCallbackTest.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/storage/am/btree/AbstractModificationOperationCallbackTest.java b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/storage/am/btree/AbstractModificationOperationCallbackTest.java
index 6da9334..65a3067 100644
--- a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/storage/am/btree/AbstractModificationOperationCallbackTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/storage/am/btree/AbstractModificationOperationCallbackTest.java
@@ -98,11 +98,6 @@ public abstract class AbstractModificationOperationCallbackTest extends Abstract
             }
             Assert.assertEquals(0, cmp.compare(AbstractModificationOperationCallbackTest.this.tuple, after));
         }
-
-        @Override
-        public void setOp(Operation op) throws HyracksDataException {
-        }
-
     }
 
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/04075533/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/storage/am/common/TestOperationCallback.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/storage/am/common/TestOperationCallback.java b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/storage/am/common/TestOperationCallback.java
index 15d8a60..664d9d6 100644
--- a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/storage/am/common/TestOperationCallback.java
+++ b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/storage/am/common/TestOperationCallback.java
@@ -65,10 +65,4 @@ public enum TestOperationCallback implements ISearchOperationCallback,IModificat
     public void complete(ITupleReference tuple) throws HyracksDataException {
         // Do nothing.
     }
-
-    @Override
-    public void setOp(Operation op) throws HyracksDataException {
-        // Do nothing.
-    }
-
 }


[2/3] asterixdb git commit: Fix transaction logs and optimize upserts

Posted by am...@apache.org.
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/04075533/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertOperatorNodePushable.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertOperatorNodePushable.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertOperatorNodePushable.java
index eab9cc7..0db5ff0 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertOperatorNodePushable.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertOperatorNodePushable.java
@@ -20,11 +20,12 @@ package org.apache.asterix.runtime.operators;
 
 import java.nio.ByteBuffer;
 
+import org.apache.asterix.transaction.management.opcallbacks.AbstractIndexModificationOperationCallback;
+import org.apache.asterix.transaction.management.opcallbacks.AbstractIndexModificationOperationCallback.Operation;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
-import org.apache.hyracks.storage.am.common.api.IModificationOperationCallback.Operation;
 import org.apache.hyracks.storage.am.common.dataflow.IIndexOperatorDescriptor;
 import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
 import org.apache.hyracks.storage.am.common.tuples.PermutingFrameTupleReference;
@@ -53,6 +54,7 @@ public class LSMSecondaryUpsertOperatorNodePushable extends LSMIndexInsertUpdate
     private int numberOfFields;
     private boolean isNewNull = false;
     private boolean isPrevValueNull = false;
+    private AbstractIndexModificationOperationCallback abstractModCallback;
 
     public LSMSecondaryUpsertOperatorNodePushable(IIndexOperatorDescriptor opDesc, IHyracksTaskContext ctx,
             int partition, int[] fieldPermutation, IRecordDescriptorProvider recordDescProvider,
@@ -62,6 +64,12 @@ public class LSMSecondaryUpsertOperatorNodePushable extends LSMIndexInsertUpdate
         this.numberOfFields = prevValuePermutation.length;
     }
 
+    @Override
+    public void open() throws HyracksDataException {
+        super.open();
+        abstractModCallback = (AbstractIndexModificationOperationCallback) modCallback;
+    }
+
     public static boolean equals(byte[] a, int aOffset, int aLength, byte[] b, int bOffset, int bLength) {
         if (aLength != bLength) {
             return false;
@@ -109,12 +117,12 @@ public class LSMSecondaryUpsertOperatorNodePushable extends LSMIndexInsertUpdate
                 }
                 if (!isPrevValueNull) {
                     // previous is not null, we need to delete previous
-                    modCallback.setOp(Operation.DELETE);
+                    abstractModCallback.setOp(Operation.DELETE);
                     lsmAccessor.forceDelete(prevValueTuple);
                 }
                 if (!isNewNull) {
                     // new is not null, we need to insert the new value
-                    modCallback.setOp(Operation.INSERT);
+                    abstractModCallback.setOp(Operation.INSERT);
                     lsmAccessor.forceInsert(tuple);
                 }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/04075533/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/std/FlushDatasetOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/std/FlushDatasetOperatorDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/std/FlushDatasetOperatorDescriptor.java
index ccb63ac..aca6455 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/std/FlushDatasetOperatorDescriptor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/std/FlushDatasetOperatorDescriptor.java
@@ -27,6 +27,7 @@ import org.apache.asterix.common.transactions.DatasetId;
 import org.apache.asterix.common.transactions.ILockManager;
 import org.apache.asterix.common.transactions.ITransactionContext;
 import org.apache.asterix.common.transactions.ITransactionManager;
+import org.apache.asterix.common.transactions.ImmutableDatasetId;
 import org.apache.asterix.common.transactions.JobId;
 import org.apache.asterix.transaction.management.service.transaction.TransactionManagementConstants.LockManagerConstants.LockMode;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
@@ -45,7 +46,7 @@ public class FlushDatasetOperatorDescriptor extends AbstractSingleActivityOperat
     public FlushDatasetOperatorDescriptor(IOperatorDescriptorRegistry spec, JobId jobId, int datasetId) {
         super(spec, 1, 0);
         this.jobId = jobId;
-        this.datasetId = new DatasetId(datasetId);
+        this.datasetId = new ImmutableDatasetId(datasetId);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/04075533/asterixdb/asterix-transactions/pom.xml
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-transactions/pom.xml b/asterixdb/asterix-transactions/pom.xml
index a65a436..2ff0ccd 100644
--- a/asterixdb/asterix-transactions/pom.xml
+++ b/asterixdb/asterix-transactions/pom.xml
@@ -106,11 +106,6 @@
       <scope>compile</scope>
     </dependency>
     <dependency>
-      <groupId>org.apache.asterix</groupId>
-      <artifactId>asterix-om</artifactId>
-      <version>${project.version}</version>
-    </dependency>
-    <dependency>
       <groupId>com.google.guava</groupId>
       <artifactId>guava</artifactId>
     </dependency>

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/04075533/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/AbstractIndexModificationOperationCallback.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/AbstractIndexModificationOperationCallback.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/AbstractIndexModificationOperationCallback.java
index 2a3467e..0e8f119 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/AbstractIndexModificationOperationCallback.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/AbstractIndexModificationOperationCallback.java
@@ -20,6 +20,7 @@ package org.apache.asterix.transaction.management.opcallbacks;
 
 import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.transactions.AbstractOperationCallback;
+import org.apache.asterix.common.transactions.DatasetId;
 import org.apache.asterix.common.transactions.ILockManager;
 import org.apache.asterix.common.transactions.ILogRecord;
 import org.apache.asterix.common.transactions.ITransactionContext;
@@ -28,23 +29,54 @@ import org.apache.asterix.common.transactions.LogRecord;
 import org.apache.asterix.common.transactions.LogType;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
-import org.apache.hyracks.storage.am.common.api.IModificationOperationCallback.Operation;
+import org.apache.hyracks.storage.am.common.api.IModificationOperationCallback;
 import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
 import org.apache.hyracks.storage.am.common.tuples.SimpleTupleWriter;
 
-public abstract class AbstractIndexModificationOperationCallback extends AbstractOperationCallback {
+public abstract class AbstractIndexModificationOperationCallback extends AbstractOperationCallback
+        implements IModificationOperationCallback {
+    public static final byte INSERT_BYTE = 0x01;
+    public static final byte DELETE_BYTE = 0x02;
+    public static final byte UPSERT_BYTE = 0x03;
+
+    public enum Operation {
+        INSERT(INSERT_BYTE),
+        DELETE(DELETE_BYTE),
+        UPSERT(UPSERT_BYTE);
+        private byte value;
+
+        Operation(byte value) {
+            this.value = value;
+        }
+
+        byte value() {
+            return value;
+        }
+
+        public static Operation get(IndexOperation op) {
+            switch (op) {
+                case DELETE:
+                    return DELETE;
+                case INSERT:
+                    return INSERT;
+                case UPSERT:
+                    return UPSERT;
+                default:
+                    throw new IllegalArgumentException();
+
+            }
+        }
+    }
 
-    private static final byte INSERT_OP = (byte) IndexOperation.INSERT.ordinal();
-    private static final byte DELETE_OP = (byte) IndexOperation.DELETE.ordinal();
     protected final long resourceId;
     protected final byte resourceType;
-    protected final IndexOperation indexOp;
+    protected final Operation indexOp;
     protected final ITransactionSubsystem txnSubsystem;
     protected final ILogRecord logRecord;
 
-    protected AbstractIndexModificationOperationCallback(int datasetId, int[] primaryKeyFields,
+    protected AbstractIndexModificationOperationCallback(DatasetId datasetId, int[] primaryKeyFields,
             ITransactionContext txnCtx, ILockManager lockManager, ITransactionSubsystem txnSubsystem, long resourceId,
-            int resourcePartition, byte resourceType, IndexOperation indexOp) {
+            int resourcePartition, byte resourceType, Operation indexOp) {
         super(datasetId, primaryKeyFields, txnCtx, lockManager);
         this.resourceId = resourceId;
         this.resourceType = resourceType;
@@ -54,10 +86,10 @@ public abstract class AbstractIndexModificationOperationCallback extends Abstrac
         logRecord.setTxnCtx(txnCtx);
         logRecord.setLogType(LogType.UPDATE);
         logRecord.setJobId(txnCtx.getJobId().getId());
-        logRecord.setDatasetId(datasetId);
+        logRecord.setDatasetId(datasetId.getId());
         logRecord.setResourceId(resourceId);
         logRecord.setResourcePartition(resourcePartition);
-        logRecord.setNewOp((byte) (indexOp.ordinal()));
+        logRecord.setNewOp(indexOp.value());
     }
 
     protected void log(int PKHash, ITupleReference newValue, ITupleReference oldValue) throws ACIDException {
@@ -81,14 +113,14 @@ public abstract class AbstractIndexModificationOperationCallback extends Abstrac
         txnSubsystem.getLogManager().log(logRecord);
     }
 
+    /**
+     * This call specifies the next operation to be performed. It is used to allow
+     * a single operator to perform different operations per tuple
+     *
+     * @param op
+     * @throws HyracksDataException
+     */
     public void setOp(Operation op) throws HyracksDataException {
-        switch (op) {
-            case DELETE:
-                logRecord.setNewOp(DELETE_OP);
-                break;
-            case INSERT:
-                logRecord.setNewOp(INSERT_OP);
-                break;
-        }
+        logRecord.setNewOp(op.value());
     }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/04075533/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/LockThenSearchOperationCallback.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/LockThenSearchOperationCallback.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/LockThenSearchOperationCallback.java
index 288e7a5..4592fd6 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/LockThenSearchOperationCallback.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/LockThenSearchOperationCallback.java
@@ -20,6 +20,7 @@ package org.apache.asterix.transaction.management.opcallbacks;
 
 import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.transactions.AbstractOperationCallback;
+import org.apache.asterix.common.transactions.DatasetId;
 import org.apache.asterix.common.transactions.ILogManager;
 import org.apache.asterix.common.transactions.ILogRecord;
 import org.apache.asterix.common.transactions.ITransactionContext;
@@ -44,7 +45,8 @@ public class LockThenSearchOperationCallback extends AbstractOperationCallback i
     private final ILogRecord logRecord;
     private int pkHash;
 
-    public LockThenSearchOperationCallback(int datasetId, int[] entityIdFields, ITransactionSubsystem txnSubsystem,
+    public LockThenSearchOperationCallback(DatasetId datasetId, int[] entityIdFields,
+            ITransactionSubsystem txnSubsystem,
             ITransactionContext txnCtx, IOperatorNodePushable operatorNodePushable) {
         super(datasetId, entityIdFields, txnCtx, txnSubsystem.getLockManager());
         this.operatorNodePushable = (LSMIndexInsertUpdateDeleteOperatorNodePushable) operatorNodePushable;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/04075533/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/LockThenSearchOperationCallbackFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/LockThenSearchOperationCallbackFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/LockThenSearchOperationCallbackFactory.java
index 0d65c16..62a57b0 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/LockThenSearchOperationCallbackFactory.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/LockThenSearchOperationCallbackFactory.java
@@ -21,6 +21,7 @@ package org.apache.asterix.transaction.management.opcallbacks;
 import org.apache.asterix.common.context.ITransactionSubsystemProvider;
 import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.transactions.AbstractOperationCallbackFactory;
+import org.apache.asterix.common.transactions.DatasetId;
 import org.apache.asterix.common.transactions.ITransactionContext;
 import org.apache.asterix.common.transactions.ITransactionSubsystem;
 import org.apache.asterix.common.transactions.JobId;
@@ -46,7 +47,7 @@ public class LockThenSearchOperationCallbackFactory extends AbstractOperationCal
         ITransactionSubsystem txnSubsystem = txnSubsystemProvider.getTransactionSubsystem(ctx);
         try {
             ITransactionContext txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(jobId, false);
-            return new LockThenSearchOperationCallback(datasetId, primaryKeyFields, txnSubsystem, txnCtx,
+            return new LockThenSearchOperationCallback(new DatasetId(datasetId), primaryKeyFields, txnSubsystem, txnCtx,
                     operatorNodePushable);
         } catch (ACIDException e) {
             throw new HyracksDataException(e);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/04075533/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexInstantSearchOperationCallback.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexInstantSearchOperationCallback.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexInstantSearchOperationCallback.java
index b2477cd..890823c 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexInstantSearchOperationCallback.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexInstantSearchOperationCallback.java
@@ -21,6 +21,7 @@ package org.apache.asterix.transaction.management.opcallbacks;
 
 import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.transactions.AbstractOperationCallback;
+import org.apache.asterix.common.transactions.DatasetId;
 import org.apache.asterix.common.transactions.ILockManager;
 import org.apache.asterix.common.transactions.ITransactionContext;
 import org.apache.asterix.transaction.management.service.transaction.TransactionManagementConstants.LockManagerConstants.LockMode;
@@ -34,8 +35,8 @@ import org.apache.hyracks.storage.am.common.api.ISearchOperationCallback;
 public class PrimaryIndexInstantSearchOperationCallback extends AbstractOperationCallback
         implements ISearchOperationCallback {
 
-    public PrimaryIndexInstantSearchOperationCallback(int datasetId, int[] entityIdFields, ILockManager lockManager,
-            ITransactionContext txnCtx) {
+    public PrimaryIndexInstantSearchOperationCallback(DatasetId datasetId, int[] entityIdFields,
+            ILockManager lockManager, ITransactionContext txnCtx) {
         super(datasetId, entityIdFields, txnCtx, lockManager);
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/04075533/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexInstantSearchOperationCallbackFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexInstantSearchOperationCallbackFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexInstantSearchOperationCallbackFactory.java
index ba97287..4485109 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexInstantSearchOperationCallbackFactory.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexInstantSearchOperationCallbackFactory.java
@@ -22,6 +22,7 @@ package org.apache.asterix.transaction.management.opcallbacks;
 import org.apache.asterix.common.context.ITransactionSubsystemProvider;
 import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.transactions.AbstractOperationCallbackFactory;
+import org.apache.asterix.common.transactions.DatasetId;
 import org.apache.asterix.common.transactions.ITransactionContext;
 import org.apache.asterix.common.transactions.ITransactionSubsystem;
 import org.apache.asterix.common.transactions.JobId;
@@ -47,7 +48,7 @@ public class PrimaryIndexInstantSearchOperationCallbackFactory extends AbstractO
         ITransactionSubsystem txnSubsystem = txnSubsystemProvider.getTransactionSubsystem(ctx);
         try {
             ITransactionContext txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(jobId, false);
-            return new PrimaryIndexInstantSearchOperationCallback(datasetId, primaryKeyFields,
+            return new PrimaryIndexInstantSearchOperationCallback(new DatasetId(datasetId), primaryKeyFields,
                     txnSubsystem.getLockManager(), txnCtx);
         } catch (ACIDException e) {
             throw new HyracksDataException(e);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/04075533/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallback.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallback.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallback.java
index c627367..9e96fbb 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallback.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallback.java
@@ -21,6 +21,7 @@ package org.apache.asterix.transaction.management.opcallbacks;
 
 import org.apache.asterix.common.dataflow.LSMInsertDeleteOperatorNodePushable;
 import org.apache.asterix.common.exceptions.ACIDException;
+import org.apache.asterix.common.transactions.DatasetId;
 import org.apache.asterix.common.transactions.ILockManager;
 import org.apache.asterix.common.transactions.ITransactionContext;
 import org.apache.asterix.common.transactions.ITransactionSubsystem;
@@ -29,27 +30,21 @@ import org.apache.asterix.transaction.management.service.transaction.Transaction
 import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
-import org.apache.hyracks.storage.am.common.api.IModificationOperationCallback;
-import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
 
 /**
  * Assumes LSM-BTrees as primary indexes.
  * Performs locking on primary keys, and also logs before/after images.
  */
-public class PrimaryIndexModificationOperationCallback extends AbstractIndexModificationOperationCallback
-        implements IModificationOperationCallback {
+public class PrimaryIndexModificationOperationCallback extends AbstractIndexModificationOperationCallback {
 
     private final LSMInsertDeleteOperatorNodePushable operatorNodePushable;
-    private final boolean logBeforeImage;
 
-    public PrimaryIndexModificationOperationCallback(int datasetId, int[] primaryKeyFields, ITransactionContext txnCtx,
-            ILockManager lockManager, ITransactionSubsystem txnSubsystem, long resourceId, int resourcePartition,
-            byte resourceType, IndexOperation indexOp, IOperatorNodePushable operatorNodePushable,
-            boolean logBeforeImage) {
+    public PrimaryIndexModificationOperationCallback(DatasetId datasetId, int[] primaryKeyFields,
+            ITransactionContext txnCtx, ILockManager lockManager, ITransactionSubsystem txnSubsystem, long resourceId,
+            int resourcePartition, byte resourceType, Operation indexOp, IOperatorNodePushable operatorNodePushable) {
         super(datasetId, primaryKeyFields, txnCtx, lockManager, txnSubsystem, resourceId, resourcePartition,
                 resourceType, indexOp);
         this.operatorNodePushable = (LSMInsertDeleteOperatorNodePushable) operatorNodePushable;
-        this.logBeforeImage = logBeforeImage;
     }
 
     @Override
@@ -102,11 +97,7 @@ public class PrimaryIndexModificationOperationCallback extends AbstractIndexModi
     public void found(ITupleReference before, ITupleReference after) throws HyracksDataException {
         try {
             int pkHash = computePrimaryKeyHashValue(after, primaryKeyFields);
-            if (logBeforeImage) {
-                log(pkHash, after, before);
-            } else {
-                log(pkHash, after, null);
-            }
+            log(pkHash, after, before);
         } catch (ACIDException e) {
             throw new HyracksDataException(e);
         }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/04075533/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java
index 4cbb8cf..5a58689 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java
@@ -23,10 +23,12 @@ import org.apache.asterix.common.context.ITransactionSubsystemProvider;
 import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.transactions.AbstractOperationCallback;
 import org.apache.asterix.common.transactions.AbstractOperationCallbackFactory;
+import org.apache.asterix.common.transactions.DatasetId;
 import org.apache.asterix.common.transactions.ITransactionContext;
 import org.apache.asterix.common.transactions.ITransactionSubsystem;
 import org.apache.asterix.common.transactions.JobId;
 import org.apache.asterix.common.transactions.Resource;
+import org.apache.asterix.transaction.management.opcallbacks.AbstractIndexModificationOperationCallback.Operation;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -34,7 +36,6 @@ import org.apache.hyracks.storage.am.common.api.IIndex;
 import org.apache.hyracks.storage.am.common.api.IModificationOperationCallback;
 import org.apache.hyracks.storage.am.common.api.IModificationOperationCallbackFactory;
 import org.apache.hyracks.storage.am.common.api.IResourceLifecycleManager;
-import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
 import org.apache.hyracks.storage.common.file.LocalResource;
 
@@ -45,15 +46,12 @@ public class PrimaryIndexModificationOperationCallbackFactory extends AbstractOp
         implements IModificationOperationCallbackFactory {
 
     private static final long serialVersionUID = 1L;
-    private final IndexOperation indexOp;
-    private final boolean logBeforeImage;
+    private final Operation indexOp;
 
     public PrimaryIndexModificationOperationCallbackFactory(JobId jobId, int datasetId, int[] primaryKeyFields,
-            ITransactionSubsystemProvider txnSubsystemProvider, IndexOperation indexOp, byte resourceType,
-            boolean logBeforeImage) {
+            ITransactionSubsystemProvider txnSubsystemProvider, Operation indexOp, byte resourceType) {
         super(jobId, datasetId, primaryKeyFields, txnSubsystemProvider, resourceType);
         this.indexOp = indexOp;
-        this.logBeforeImage = logBeforeImage;
     }
 
     @Override
@@ -70,9 +68,9 @@ public class PrimaryIndexModificationOperationCallbackFactory extends AbstractOp
         try {
             ITransactionContext txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(jobId, false);
             Resource aResource = (Resource) resource.getResource();
-            IModificationOperationCallback modCallback = new PrimaryIndexModificationOperationCallback(datasetId,
-                    primaryKeyFields, txnCtx, txnSubsystem.getLockManager(), txnSubsystem, resource.getId(),
-                    aResource.partition(), resourceType, indexOp, operatorNodePushable, logBeforeImage);
+            IModificationOperationCallback modCallback = new PrimaryIndexModificationOperationCallback(
+                    new DatasetId(datasetId), primaryKeyFields, txnCtx, txnSubsystem.getLockManager(), txnSubsystem,
+                    resource.getId(), aResource.partition(), resourceType, indexOp, operatorNodePushable);
             txnCtx.registerIndexAndCallback(resource.getId(), index, (AbstractOperationCallback) modCallback, true);
             return modCallback;
         } catch (ACIDException e) {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/04075533/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexSearchOperationCallback.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexSearchOperationCallback.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexSearchOperationCallback.java
index 9532f9e..4f255e0 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexSearchOperationCallback.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexSearchOperationCallback.java
@@ -21,6 +21,7 @@ package org.apache.asterix.transaction.management.opcallbacks;
 
 import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.transactions.AbstractOperationCallback;
+import org.apache.asterix.common.transactions.DatasetId;
 import org.apache.asterix.common.transactions.ILockManager;
 import org.apache.asterix.common.transactions.ITransactionContext;
 import org.apache.asterix.transaction.management.service.transaction.TransactionManagementConstants.LockManagerConstants.LockMode;
@@ -33,7 +34,7 @@ import org.apache.hyracks.storage.am.common.api.ISearchOperationCallback;
  */
 public class PrimaryIndexSearchOperationCallback extends AbstractOperationCallback implements ISearchOperationCallback {
 
-    public PrimaryIndexSearchOperationCallback(int datasetId, int[] entityIdFields, ILockManager lockManager,
+    public PrimaryIndexSearchOperationCallback(DatasetId datasetId, int[] entityIdFields, ILockManager lockManager,
             ITransactionContext txnCtx) {
         super(datasetId, entityIdFields, txnCtx, lockManager);
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/04075533/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexSearchOperationCallbackFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexSearchOperationCallbackFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexSearchOperationCallbackFactory.java
index 1dd8368..595db7a 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexSearchOperationCallbackFactory.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexSearchOperationCallbackFactory.java
@@ -22,6 +22,7 @@ package org.apache.asterix.transaction.management.opcallbacks;
 import org.apache.asterix.common.context.ITransactionSubsystemProvider;
 import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.transactions.AbstractOperationCallbackFactory;
+import org.apache.asterix.common.transactions.DatasetId;
 import org.apache.asterix.common.transactions.ITransactionContext;
 import org.apache.asterix.common.transactions.ITransactionSubsystem;
 import org.apache.asterix.common.transactions.JobId;
@@ -31,8 +32,8 @@ import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.storage.am.common.api.ISearchOperationCallback;
 import org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory;
 
-public class PrimaryIndexSearchOperationCallbackFactory extends AbstractOperationCallbackFactory implements
-        ISearchOperationCallbackFactory {
+public class PrimaryIndexSearchOperationCallbackFactory extends AbstractOperationCallbackFactory
+        implements ISearchOperationCallbackFactory {
 
     private static final long serialVersionUID = 1L;
 
@@ -42,13 +43,13 @@ public class PrimaryIndexSearchOperationCallbackFactory extends AbstractOperatio
     }
 
     @Override
-    public ISearchOperationCallback createSearchOperationCallback(long resourceId, IHyracksTaskContext ctx, IOperatorNodePushable operatorNodePushable)
-            throws HyracksDataException {
+    public ISearchOperationCallback createSearchOperationCallback(long resourceId, IHyracksTaskContext ctx,
+            IOperatorNodePushable operatorNodePushable) throws HyracksDataException {
         ITransactionSubsystem txnSubsystem = txnSubsystemProvider.getTransactionSubsystem(ctx);
         try {
             ITransactionContext txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(jobId, false);
-            return new PrimaryIndexSearchOperationCallback(datasetId, primaryKeyFields, txnSubsystem.getLockManager(),
-                    txnCtx);
+            return new PrimaryIndexSearchOperationCallback(new DatasetId(datasetId), primaryKeyFields,
+                    txnSubsystem.getLockManager(), txnCtx);
         } catch (ACIDException e) {
             throw new HyracksDataException(e);
         }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/04075533/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallback.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallback.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallback.java
index 974e631..7aed1fd 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallback.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallback.java
@@ -20,32 +20,25 @@
 package org.apache.asterix.transaction.management.opcallbacks;
 
 import org.apache.asterix.common.exceptions.ACIDException;
+import org.apache.asterix.common.transactions.DatasetId;
 import org.apache.asterix.common.transactions.ILockManager;
 import org.apache.asterix.common.transactions.ITransactionContext;
 import org.apache.asterix.common.transactions.ITransactionSubsystem;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
-import org.apache.hyracks.storage.am.common.api.IModificationOperationCallback;
-import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
 
 /**
  * Secondary-index modifications do not require any locking.
  * We assume that the modification of the corresponding primary index has already taken an appropriate lock.
  * This callback performs logging of the before and/or after images for secondary indexes.
  */
-public class SecondaryIndexModificationOperationCallback extends AbstractIndexModificationOperationCallback
-        implements IModificationOperationCallback {
+public class SecondaryIndexModificationOperationCallback extends AbstractIndexModificationOperationCallback {
 
-    protected final IndexOperation oldOp;
-    private final boolean logBeforeImage;
-
-    public SecondaryIndexModificationOperationCallback(int datasetId, int[] primaryKeyFields,
+    public SecondaryIndexModificationOperationCallback(DatasetId datasetId, int[] primaryKeyFields,
             ITransactionContext txnCtx, ILockManager lockManager, ITransactionSubsystem txnSubsystem, long resourceId,
-            int resourcePartition, byte resourceType, IndexOperation indexOp, boolean logBeforeImage) {
+            int resourcePartition, byte resourceType, Operation indexOp) {
         super(datasetId, primaryKeyFields, txnCtx, lockManager, txnSubsystem, resourceId, resourcePartition,
                 resourceType, indexOp);
-        oldOp = (indexOp == IndexOperation.DELETE) ? IndexOperation.INSERT : IndexOperation.DELETE;
-        this.logBeforeImage = logBeforeImage;
     }
 
     @Override
@@ -57,7 +50,7 @@ public class SecondaryIndexModificationOperationCallback extends AbstractIndexMo
     public void found(ITupleReference before, ITupleReference after) throws HyracksDataException {
         try {
             int pkHash = computePrimaryKeyHashValue(after, primaryKeyFields);
-            this.log(pkHash, after, logBeforeImage ? before : null);
+            this.log(pkHash, after, before);
         } catch (ACIDException e) {
             throw new HyracksDataException(e);
         }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/04075533/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
index 3925bba..5a395f7 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
@@ -23,17 +23,18 @@ import org.apache.asterix.common.context.ITransactionSubsystemProvider;
 import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.transactions.AbstractOperationCallback;
 import org.apache.asterix.common.transactions.AbstractOperationCallbackFactory;
+import org.apache.asterix.common.transactions.DatasetId;
 import org.apache.asterix.common.transactions.ITransactionContext;
 import org.apache.asterix.common.transactions.ITransactionSubsystem;
 import org.apache.asterix.common.transactions.JobId;
 import org.apache.asterix.common.transactions.Resource;
+import org.apache.asterix.transaction.management.opcallbacks.AbstractIndexModificationOperationCallback.Operation;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.storage.am.common.api.IModificationOperationCallback;
 import org.apache.hyracks.storage.am.common.api.IModificationOperationCallbackFactory;
 import org.apache.hyracks.storage.am.common.api.IResourceLifecycleManager;
-import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
 import org.apache.hyracks.storage.common.file.LocalResource;
 
@@ -41,23 +42,20 @@ public class SecondaryIndexModificationOperationCallbackFactory extends Abstract
         implements IModificationOperationCallbackFactory {
 
     private static final long serialVersionUID = 1L;
-    private final IndexOperation indexOp;
-    private final boolean logBeforeImage;
+    private final Operation indexOp;
 
     public SecondaryIndexModificationOperationCallbackFactory(JobId jobId, int datasetId, int[] primaryKeyFields,
-            ITransactionSubsystemProvider txnSubsystemProvider, IndexOperation indexOp, byte resourceType,
-            boolean logBeforeImage) {
+            ITransactionSubsystemProvider txnSubsystemProvider, Operation indexOp, byte resourceType) {
         super(jobId, datasetId, primaryKeyFields, txnSubsystemProvider, resourceType);
         this.indexOp = indexOp;
-        this.logBeforeImage = logBeforeImage;
     }
 
     @Override
     public IModificationOperationCallback createModificationOperationCallback(LocalResource resource,
             IHyracksTaskContext ctx, IOperatorNodePushable operatorNodePushable) throws HyracksDataException {
         ITransactionSubsystem txnSubsystem = txnSubsystemProvider.getTransactionSubsystem(ctx);
-        IResourceLifecycleManager indexLifeCycleManager = txnSubsystem.getAsterixAppRuntimeContextProvider()
-                .getDatasetLifecycleManager();
+        IResourceLifecycleManager indexLifeCycleManager =
+                txnSubsystem.getAsterixAppRuntimeContextProvider().getDatasetLifecycleManager();
         ILSMIndex index = (ILSMIndex) indexLifeCycleManager.get(resource.getPath());
         if (index == null) {
             throw new HyracksDataException("Index(id:" + resource.getId() + ") is not registered.");
@@ -66,9 +64,9 @@ public class SecondaryIndexModificationOperationCallbackFactory extends Abstract
         try {
             ITransactionContext txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(jobId, false);
             Resource aResource = (Resource) resource.getResource();
-            IModificationOperationCallback modCallback = new SecondaryIndexModificationOperationCallback(datasetId,
-                    primaryKeyFields, txnCtx, txnSubsystem.getLockManager(), txnSubsystem, resource.getId(),
-                    aResource.partition(), resourceType, indexOp, logBeforeImage);
+            IModificationOperationCallback modCallback = new SecondaryIndexModificationOperationCallback(
+                    new DatasetId(datasetId), primaryKeyFields, txnCtx, txnSubsystem.getLockManager(), txnSubsystem,
+                    resource.getId(), aResource.partition(), resourceType, indexOp);
             txnCtx.registerIndexAndCallback(resource.getId(), index, (AbstractOperationCallback) modCallback, false);
             return modCallback;
         } catch (ACIDException e) {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/04075533/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexSearchOperationCallback.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexSearchOperationCallback.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexSearchOperationCallback.java
index ac5f4d4..6905033 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexSearchOperationCallback.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexSearchOperationCallback.java
@@ -20,6 +20,7 @@
 package org.apache.asterix.transaction.management.opcallbacks;
 
 import org.apache.asterix.common.transactions.AbstractOperationCallback;
+import org.apache.asterix.common.transactions.DatasetId;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
 import org.apache.hyracks.storage.am.common.api.ISearchOperationCallback;
@@ -31,7 +32,7 @@ public class SecondaryIndexSearchOperationCallback extends AbstractOperationCall
         implements ISearchOperationCallback {
 
     public SecondaryIndexSearchOperationCallback() {
-        super(-1, null, null, null);
+        super(DatasetId.NULL, null, null, null);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/04075533/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetIndexModificationOperationCallback.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetIndexModificationOperationCallback.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetIndexModificationOperationCallback.java
index 32d3461..c609fc6 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetIndexModificationOperationCallback.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetIndexModificationOperationCallback.java
@@ -19,13 +19,12 @@
 
 package org.apache.asterix.transaction.management.opcallbacks;
 
+import org.apache.asterix.common.transactions.DatasetId;
 import org.apache.asterix.common.transactions.ILockManager;
 import org.apache.asterix.common.transactions.ITransactionContext;
 import org.apache.asterix.common.transactions.ITransactionSubsystem;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
-import org.apache.hyracks.storage.am.common.api.IModificationOperationCallback;
-import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
 
 /**
  * This class is the operation callback for temporary datasets.
@@ -34,12 +33,11 @@ import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
  * The "before" and "found" method in this callback is empty so that no locking is requested for accessing a temporary
  * dataset and no write-ahead log is written for update operations.
  */
-public class TempDatasetIndexModificationOperationCallback extends AbstractIndexModificationOperationCallback
-        implements IModificationOperationCallback {
+public class TempDatasetIndexModificationOperationCallback extends AbstractIndexModificationOperationCallback {
 
-    public TempDatasetIndexModificationOperationCallback(int datasetId, int[] primaryKeyFields,
+    public TempDatasetIndexModificationOperationCallback(DatasetId datasetId, int[] primaryKeyFields,
             ITransactionContext txnCtx, ILockManager lockManager, ITransactionSubsystem txnSubsystem, long resourceId,
-            int resourcePartition, byte resourceType, IndexOperation indexOp) {
+            int resourcePartition, byte resourceType, Operation indexOp) {
         super(datasetId, primaryKeyFields, txnCtx, lockManager, txnSubsystem, resourceId, resourcePartition,
                 resourceType, indexOp);
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/04075533/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetPrimaryIndexModificationOperationCallbackFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetPrimaryIndexModificationOperationCallbackFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetPrimaryIndexModificationOperationCallbackFactory.java
index 6e07ea3..1956c8d 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetPrimaryIndexModificationOperationCallbackFactory.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetPrimaryIndexModificationOperationCallbackFactory.java
@@ -23,17 +23,18 @@ import org.apache.asterix.common.context.ITransactionSubsystemProvider;
 import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.transactions.AbstractOperationCallback;
 import org.apache.asterix.common.transactions.AbstractOperationCallbackFactory;
+import org.apache.asterix.common.transactions.DatasetId;
 import org.apache.asterix.common.transactions.ITransactionContext;
 import org.apache.asterix.common.transactions.ITransactionSubsystem;
 import org.apache.asterix.common.transactions.JobId;
 import org.apache.asterix.common.transactions.Resource;
+import org.apache.asterix.transaction.management.opcallbacks.AbstractIndexModificationOperationCallback.Operation;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.storage.am.common.api.IModificationOperationCallback;
 import org.apache.hyracks.storage.am.common.api.IModificationOperationCallbackFactory;
 import org.apache.hyracks.storage.am.common.api.IResourceLifecycleManager;
-import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
 import org.apache.hyracks.storage.common.file.LocalResource;
 
@@ -41,10 +42,10 @@ public class TempDatasetPrimaryIndexModificationOperationCallbackFactory extends
         implements IModificationOperationCallbackFactory {
 
     private static final long serialVersionUID = 1L;
-    private final IndexOperation indexOp;
+    private final Operation indexOp;
 
     public TempDatasetPrimaryIndexModificationOperationCallbackFactory(JobId jobId, int datasetId,
-            int[] primaryKeyFields, ITransactionSubsystemProvider txnSubsystemProvider, IndexOperation indexOp,
+            int[] primaryKeyFields, ITransactionSubsystemProvider txnSubsystemProvider, Operation indexOp,
             byte resourceType) {
         super(jobId, datasetId, primaryKeyFields, txnSubsystemProvider, resourceType);
         this.indexOp = indexOp;
@@ -54,8 +55,8 @@ public class TempDatasetPrimaryIndexModificationOperationCallbackFactory extends
     public IModificationOperationCallback createModificationOperationCallback(LocalResource resource,
             IHyracksTaskContext ctx, IOperatorNodePushable operatorNodePushable) throws HyracksDataException {
         ITransactionSubsystem txnSubsystem = txnSubsystemProvider.getTransactionSubsystem(ctx);
-        IResourceLifecycleManager indexLifeCycleManager = txnSubsystem.getAsterixAppRuntimeContextProvider()
-                .getDatasetLifecycleManager();
+        IResourceLifecycleManager indexLifeCycleManager =
+                txnSubsystem.getAsterixAppRuntimeContextProvider().getDatasetLifecycleManager();
         ILSMIndex index = (ILSMIndex) indexLifeCycleManager.get(resource.getPath());
         if (index == null) {
             throw new HyracksDataException("Index(id:" + resource.getId() + ") is not registered.");
@@ -64,9 +65,9 @@ public class TempDatasetPrimaryIndexModificationOperationCallbackFactory extends
         try {
             ITransactionContext txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(jobId, false);
             Resource aResource = (Resource) resource.getResource();
-            IModificationOperationCallback modCallback = new TempDatasetIndexModificationOperationCallback(datasetId,
-                    primaryKeyFields, txnCtx, txnSubsystem.getLockManager(), txnSubsystem, resource.getId(),
-                    aResource.partition(), resourceType, indexOp);
+            IModificationOperationCallback modCallback = new TempDatasetIndexModificationOperationCallback(
+                    new DatasetId(datasetId), primaryKeyFields, txnCtx, txnSubsystem.getLockManager(), txnSubsystem,
+                    resource.getId(), aResource.partition(), resourceType, indexOp);
             txnCtx.registerIndexAndCallback(resource.getId(), index, (AbstractOperationCallback) modCallback, true);
             return modCallback;
         } catch (ACIDException e) {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/04075533/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetSecondaryIndexModificationOperationCallbackFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetSecondaryIndexModificationOperationCallbackFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetSecondaryIndexModificationOperationCallbackFactory.java
index 2740994..6e955af 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetSecondaryIndexModificationOperationCallbackFactory.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetSecondaryIndexModificationOperationCallbackFactory.java
@@ -23,17 +23,18 @@ import org.apache.asterix.common.context.ITransactionSubsystemProvider;
 import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.transactions.AbstractOperationCallback;
 import org.apache.asterix.common.transactions.AbstractOperationCallbackFactory;
+import org.apache.asterix.common.transactions.DatasetId;
 import org.apache.asterix.common.transactions.ITransactionContext;
 import org.apache.asterix.common.transactions.ITransactionSubsystem;
 import org.apache.asterix.common.transactions.JobId;
 import org.apache.asterix.common.transactions.Resource;
+import org.apache.asterix.transaction.management.opcallbacks.AbstractIndexModificationOperationCallback.Operation;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.storage.am.common.api.IModificationOperationCallback;
 import org.apache.hyracks.storage.am.common.api.IModificationOperationCallbackFactory;
 import org.apache.hyracks.storage.am.common.api.IResourceLifecycleManager;
-import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
 import org.apache.hyracks.storage.common.file.LocalResource;
 
@@ -41,10 +42,10 @@ public class TempDatasetSecondaryIndexModificationOperationCallbackFactory exten
         implements IModificationOperationCallbackFactory {
 
     private static final long serialVersionUID = 1L;
-    private final IndexOperation indexOp;
+    private final Operation indexOp;
 
     public TempDatasetSecondaryIndexModificationOperationCallbackFactory(JobId jobId, int datasetId,
-            int[] primaryKeyFields, ITransactionSubsystemProvider txnSubsystemProvider, IndexOperation indexOp,
+            int[] primaryKeyFields, ITransactionSubsystemProvider txnSubsystemProvider, Operation indexOp,
             byte resourceType) {
         super(jobId, datasetId, primaryKeyFields, txnSubsystemProvider, resourceType);
         this.indexOp = indexOp;
@@ -55,8 +56,8 @@ public class TempDatasetSecondaryIndexModificationOperationCallbackFactory exten
             IHyracksTaskContext ctx, IOperatorNodePushable operatorNodePushable) throws HyracksDataException {
         Resource aResource = (Resource) resource.getResource();
         ITransactionSubsystem txnSubsystem = txnSubsystemProvider.getTransactionSubsystem(ctx);
-        IResourceLifecycleManager indexLifeCycleManager = txnSubsystem.getAsterixAppRuntimeContextProvider()
-                .getDatasetLifecycleManager();
+        IResourceLifecycleManager indexLifeCycleManager =
+                txnSubsystem.getAsterixAppRuntimeContextProvider().getDatasetLifecycleManager();
         ILSMIndex index = (ILSMIndex) indexLifeCycleManager.get(resource.getPath());
         if (index == null) {
             throw new HyracksDataException("Index(id:" + resource.getId() + ") is not registered.");
@@ -64,9 +65,9 @@ public class TempDatasetSecondaryIndexModificationOperationCallbackFactory exten
 
         try {
             ITransactionContext txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(jobId, false);
-            IModificationOperationCallback modCallback = new TempDatasetIndexModificationOperationCallback(datasetId,
-                    primaryKeyFields, txnCtx, txnSubsystem.getLockManager(), txnSubsystem, resource.getId(),
-                    aResource.partition(), resourceType, indexOp);
+            IModificationOperationCallback modCallback = new TempDatasetIndexModificationOperationCallback(
+                    new DatasetId(datasetId), primaryKeyFields, txnCtx, txnSubsystem.getLockManager(), txnSubsystem,
+                    resource.getId(), aResource.partition(), resourceType, indexOp);
             txnCtx.registerIndexAndCallback(resource.getId(), index, (AbstractOperationCallback) modCallback, false);
             return modCallback;
         } catch (ACIDException e) {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/04075533/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallback.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallback.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallback.java
index 13d2d57..443edf1 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallback.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallback.java
@@ -19,24 +19,20 @@
 package org.apache.asterix.transaction.management.opcallbacks;
 
 import org.apache.asterix.common.exceptions.ACIDException;
+import org.apache.asterix.common.transactions.DatasetId;
 import org.apache.asterix.common.transactions.ILockManager;
 import org.apache.asterix.common.transactions.ITransactionContext;
 import org.apache.asterix.common.transactions.ITransactionSubsystem;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
-import org.apache.hyracks.storage.am.common.api.IModificationOperationCallback;
-import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
 
-public class UpsertOperationCallback extends AbstractIndexModificationOperationCallback
-        implements IModificationOperationCallback {
-    private final boolean logBeforeImage;
+public class UpsertOperationCallback extends AbstractIndexModificationOperationCallback {
 
-    public UpsertOperationCallback(int datasetId, int[] primaryKeyFields, ITransactionContext txnCtx,
+    public UpsertOperationCallback(DatasetId datasetId, int[] primaryKeyFields, ITransactionContext txnCtx,
             ILockManager lockManager, ITransactionSubsystem txnSubsystem, long resourceId, int resourcePartition,
-            byte resourceType, IndexOperation indexOp, boolean logBeforeImage) {
+            byte resourceType, Operation indexOp) {
         super(datasetId, primaryKeyFields, txnCtx, lockManager, txnSubsystem, resourceId, resourcePartition,
                 resourceType, indexOp);
-        this.logBeforeImage = logBeforeImage;
     }
 
     @Override
@@ -48,7 +44,7 @@ public class UpsertOperationCallback extends AbstractIndexModificationOperationC
     public void found(ITupleReference before, ITupleReference after) throws HyracksDataException {
         try {
             int pkHash = computePrimaryKeyHashValue(after, primaryKeyFields);
-            log(pkHash, after, logBeforeImage ? before : null);
+            log(pkHash, after, before);
         } catch (ACIDException e) {
             throw new HyracksDataException(e);
         }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/04075533/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallbackFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallbackFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallbackFactory.java
index 9349e93..433bbf0 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallbackFactory.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallbackFactory.java
@@ -22,17 +22,18 @@ import org.apache.asterix.common.context.ITransactionSubsystemProvider;
 import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.transactions.AbstractOperationCallback;
 import org.apache.asterix.common.transactions.AbstractOperationCallbackFactory;
+import org.apache.asterix.common.transactions.DatasetId;
 import org.apache.asterix.common.transactions.ITransactionContext;
 import org.apache.asterix.common.transactions.ITransactionSubsystem;
 import org.apache.asterix.common.transactions.JobId;
 import org.apache.asterix.common.transactions.Resource;
+import org.apache.asterix.transaction.management.opcallbacks.AbstractIndexModificationOperationCallback.Operation;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.storage.am.common.api.IModificationOperationCallback;
 import org.apache.hyracks.storage.am.common.api.IModificationOperationCallbackFactory;
 import org.apache.hyracks.storage.am.common.api.IResourceLifecycleManager;
-import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
 import org.apache.hyracks.storage.common.file.LocalResource;
 
@@ -40,15 +41,12 @@ public class UpsertOperationCallbackFactory extends AbstractOperationCallbackFac
         implements IModificationOperationCallbackFactory {
 
     private static final long serialVersionUID = 1L;
-    private final IndexOperation indexOp;
-    private final boolean logBeforeImage;
+    protected final Operation indexOp;
 
     public UpsertOperationCallbackFactory(JobId jobId, int datasetId, int[] primaryKeyFields,
-            ITransactionSubsystemProvider txnSubsystemProvider, IndexOperation indexOp, byte resourceType,
-            boolean logBeforeImage) {
+            ITransactionSubsystemProvider txnSubsystemProvider, Operation indexOp, byte resourceType) {
         super(jobId, datasetId, primaryKeyFields, txnSubsystemProvider, resourceType);
         this.indexOp = indexOp;
-        this.logBeforeImage = logBeforeImage;
     }
 
     @Override
@@ -56,8 +54,8 @@ public class UpsertOperationCallbackFactory extends AbstractOperationCallbackFac
             IHyracksTaskContext ctx, IOperatorNodePushable operatorNodePushable) throws HyracksDataException {
         Resource aResource = (Resource) resource.getResource();
         ITransactionSubsystem txnSubsystem = txnSubsystemProvider.getTransactionSubsystem(ctx);
-        IResourceLifecycleManager indexLifeCycleManager = txnSubsystem.getAsterixAppRuntimeContextProvider()
-                .getDatasetLifecycleManager();
+        IResourceLifecycleManager indexLifeCycleManager =
+                txnSubsystem.getAsterixAppRuntimeContextProvider().getDatasetLifecycleManager();
         ILSMIndex index = (ILSMIndex) indexLifeCycleManager.get(resource.getPath());
         if (index == null) {
             throw new HyracksDataException("Index(id:" + resource.getId() + ") is not registered.");
@@ -65,9 +63,9 @@ public class UpsertOperationCallbackFactory extends AbstractOperationCallbackFac
 
         try {
             ITransactionContext txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(jobId, false);
-            IModificationOperationCallback modCallback = new UpsertOperationCallback(datasetId, primaryKeyFields,
-                    txnCtx, txnSubsystem.getLockManager(),
-                    txnSubsystem, resource.getId(), aResource.partition(), resourceType, indexOp, logBeforeImage);
+            IModificationOperationCallback modCallback = new UpsertOperationCallback(new DatasetId(datasetId),
+                    primaryKeyFields, txnCtx, txnSubsystem.getLockManager(), txnSubsystem, resource.getId(),
+                    aResource.partition(), resourceType, indexOp);
             txnCtx.registerIndexAndCallback(resource.getId(), index, (AbstractOperationCallback) modCallback, true);
             return modCallback;
         } catch (ACIDException e) {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/04075533/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntimeFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntimeFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntimeFactory.java
index cfe2a25..1b32d89 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntimeFactory.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntimeFactory.java
@@ -34,18 +34,16 @@ public class CommitRuntimeFactory implements IPushRuntimeFactory {
     protected final int[] primaryKeyFields;
     protected final boolean isTemporaryDatasetWriteJob;
     protected final boolean isWriteTransaction;
-    protected final int upsertVarIdx;
     protected int[] datasetPartitions;
     protected final boolean isSink;
 
     public CommitRuntimeFactory(JobId jobId, int datasetId, int[] primaryKeyFields, boolean isTemporaryDatasetWriteJob,
-            boolean isWriteTransaction, int upsertVarIdx, int[] datasetPartitions, boolean isSink) {
+            boolean isWriteTransaction, int[] datasetPartitions, boolean isSink) {
         this.jobId = jobId;
         this.datasetId = datasetId;
         this.primaryKeyFields = primaryKeyFields;
         this.isTemporaryDatasetWriteJob = isTemporaryDatasetWriteJob;
         this.isWriteTransaction = isWriteTransaction;
-        this.upsertVarIdx = upsertVarIdx;
         this.datasetPartitions = datasetPartitions;
         this.isSink = isSink;
     }
@@ -57,13 +55,7 @@ public class CommitRuntimeFactory implements IPushRuntimeFactory {
 
     @Override
     public IPushRuntime createPushRuntime(IHyracksTaskContext ctx) throws HyracksDataException {
-        if (upsertVarIdx >= 0) {
-            return new UpsertCommitRuntime(ctx, jobId, datasetId, primaryKeyFields, isTemporaryDatasetWriteJob,
-                    isWriteTransaction, datasetPartitions[ctx.getTaskAttemptId().getTaskId().getPartition()],
-                    upsertVarIdx, isSink);
-        } else {
             return new CommitRuntime(ctx, jobId, datasetId, primaryKeyFields, isTemporaryDatasetWriteJob,
                     isWriteTransaction, datasetPartitions[ctx.getTaskAttemptId().getTaskId().getPartition()], isSink);
-        }
     }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/04075533/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/UpsertCommitRuntime.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/UpsertCommitRuntime.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/UpsertCommitRuntime.java
deleted file mode 100644
index 9b2fe36..0000000
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/UpsertCommitRuntime.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.transaction.management.runtime;
-
-import java.nio.ByteBuffer;
-
-import org.apache.asterix.common.transactions.JobId;
-import org.apache.asterix.common.transactions.LogType;
-import org.apache.asterix.common.utils.TransactionUtil;
-import org.apache.asterix.dataflow.data.nontagged.serde.ABooleanSerializerDeserializer;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-
-public class UpsertCommitRuntime extends CommitRuntime {
-    private final int upsertIdx;
-
-    public UpsertCommitRuntime(IHyracksTaskContext ctx, JobId jobId, int datasetId, int[] primaryKeyFields,
-            boolean isTemporaryDatasetWriteJob, boolean isWriteTransaction, int resourcePartition, int upsertIdx,
-            boolean isSink) {
-        super(ctx, jobId, datasetId, primaryKeyFields, isTemporaryDatasetWriteJob, isWriteTransaction,
-                resourcePartition, isSink);
-        this.upsertIdx = upsertIdx;
-    }
-
-    @Override
-    protected void formLogRecord(ByteBuffer buffer, int t) {
-        boolean isNull = ABooleanSerializerDeserializer.getBoolean(buffer.array(), tAccess.getFieldSlotsLength()
-                + tAccess.getTupleStartOffset(t) + tAccess.getFieldStartOffset(t, upsertIdx) + 1);
-        if (isNull) {
-            // Previous record not found (insert)
-            super.formLogRecord(buffer, t);
-        } else {
-            // Previous record found (delete + insert)
-            int pkHash = computePrimaryKeyHashValue(tRef, primaryKeyFields);
-            TransactionUtil.formEntityCommitLogRecord(logRecord, transactionContext, datasetId, pkHash, tRef,
-                    primaryKeyFields, resourcePartition, LogType.UPSERT_ENTITY_COMMIT);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/04075533/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/ConcurrentLockManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/ConcurrentLockManager.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/ConcurrentLockManager.java
index 280f8d2..f386b39 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/ConcurrentLockManager.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/ConcurrentLockManager.java
@@ -96,7 +96,7 @@ public class ConcurrentLockManager implements ILockManager, ILifeCycleComponent
         resArenaMgr = new ResourceArenaManager(noArenas, lockManagerShrinkTimer);
         reqArenaMgr = new RequestArenaManager(noArenas, lockManagerShrinkTimer);
         jobArenaMgr = new JobArenaManager(noArenas, lockManagerShrinkTimer);
-        jobId2JobSlotMap = new ConcurrentHashMap<Integer, Long>();
+        jobId2JobSlotMap = new ConcurrentHashMap<>();
     }
 
     @Override
@@ -105,15 +105,13 @@ public class ConcurrentLockManager implements ILockManager, ILifeCycleComponent
         log("lock", datasetId.getId(), entityHashValue, lockMode, txnContext);
         stats.lock();
 
-        final int dsId = datasetId.getId();
         final int jobId = txnContext.getJobId().getId();
         final long jobSlot = findOrAllocJobSlot(jobId);
-        final ResourceGroup group = table.get(dsId, entityHashValue);
+        final ResourceGroup group = table.get(datasetId.getId(), entityHashValue);
         group.getLatch();
         try {
             validateJob(txnContext);
-
-            final long resSlot = findOrAllocResourceSlot(group, dsId, entityHashValue);
+            final long resSlot = findOrAllocResourceSlot(group, datasetId.getId(), entityHashValue);
             final long reqSlot = allocRequestSlot(resSlot, jobSlot, lockMode);
             boolean locked = false;
             while (!locked) {
@@ -152,8 +150,9 @@ public class ConcurrentLockManager implements ILockManager, ILifeCycleComponent
             group.releaseLatch();
         }
 
-        if (CHECK_CONSISTENCY)
+        if (CHECK_CONSISTENCY) {
             assertLocksCanBefoundInJobQueue();
+        }
     }
 
     private void enqueueWaiter(final ResourceGroup group, final long reqSlot, final long resSlot, final long jobSlot,
@@ -208,37 +207,41 @@ public class ConcurrentLockManager implements ILockManager, ILifeCycleComponent
 
         static final boolean DEBUG = false;
 
-        ArrayList<Long> slots = new ArrayList<Long>();
-        ArrayList<String> types = new ArrayList<String>();
+        ArrayList<Long> slots = new ArrayList<>();
+        ArrayList<String> types = new ArrayList<>();
 
         @Override
         public void pushResource(long resSlot) {
             types.add("Resource");
             slots.add(resSlot);
-            if (DEBUG)
+            if (DEBUG) {
                 System.err.println("push " + types.get(types.size() - 1) + " " + slots.get(slots.size() - 1));
+            }
         }
 
         @Override
         public void pushRequest(long reqSlot) {
             types.add("Request");
             slots.add(reqSlot);
-            if (DEBUG)
+            if (DEBUG) {
                 System.err.println("push " + types.get(types.size() - 1) + " " + slots.get(slots.size() - 1));
+            }
         }
 
         @Override
         public void pushJob(long jobSlot) {
             types.add("Job");
             slots.add(jobSlot);
-            if (DEBUG)
+            if (DEBUG) {
                 System.err.println("push " + types.get(types.size() - 1) + " " + slots.get(slots.size() - 1));
+            }
         }
 
         @Override
         public void pop() {
-            if (DEBUG)
+            if (DEBUG) {
                 System.err.println("pop " + types.get(types.size() - 1) + " " + slots.get(slots.size() - 1));
+            }
             types.remove(types.size() - 1);
             slots.remove(slots.size() - 1);
         }
@@ -325,9 +328,8 @@ public class ConcurrentLockManager implements ILockManager, ILifeCycleComponent
         log("instantLock", datasetId.getId(), entityHashValue, lockMode, txnContext);
         stats.instantLock();
 
-        final int dsId = datasetId.getId();
         final int jobId = txnContext.getJobId().getId();
-        final ResourceGroup group = table.get(dsId, entityHashValue);
+        final ResourceGroup group = table.get(datasetId.getId(), entityHashValue);
         if (group.firstResourceIndex.get() == NILL) {
             validateJob(txnContext);
             // if we do not have a resource in the group, we know that the
@@ -341,8 +343,7 @@ public class ConcurrentLockManager implements ILockManager, ILifeCycleComponent
         group.getLatch();
         try {
             validateJob(txnContext);
-
-            final long resSlot = findResourceInGroup(group, dsId, entityHashValue);
+            final long resSlot = findResourceInGroup(group, datasetId.getId(), entityHashValue);
             if (resSlot < 0) {
                 // if we don't find the resource, there are no locks on it.
                 return;
@@ -373,8 +374,9 @@ public class ConcurrentLockManager implements ILockManager, ILifeCycleComponent
         } finally {
             if (reqSlot != NILL) {
                 // deallocate request, if we allocated one earlier
-                if (DEBUG_MODE)
+                if (DEBUG_MODE) {
                     LOGGER.finer("del req slot " + TypeUtil.Global.toString(reqSlot));
+                }
                 reqArenaMgr.deallocate(reqSlot);
             }
             group.releaseLatch();
@@ -387,16 +389,15 @@ public class ConcurrentLockManager implements ILockManager, ILifeCycleComponent
         log("tryLock", datasetId.getId(), entityHashValue, lockMode, txnContext);
         stats.tryLock();
 
-        final int dsId = datasetId.getId();
         final int jobId = txnContext.getJobId().getId();
         final long jobSlot = findOrAllocJobSlot(jobId);
-        final ResourceGroup group = table.get(dsId, entityHashValue);
+        final ResourceGroup group = table.get(datasetId.getId(), entityHashValue);
         group.getLatch();
 
         try {
             validateJob(txnContext);
 
-            final long resSlot = findOrAllocResourceSlot(group, dsId, entityHashValue);
+            final long resSlot = findOrAllocResourceSlot(group, datasetId.getId(), entityHashValue);
             final long reqSlot = allocRequestSlot(resSlot, jobSlot, lockMode);
 
             final LockAction act = determineLockAction(resSlot, jobSlot, lockMode);
@@ -424,9 +425,8 @@ public class ConcurrentLockManager implements ILockManager, ILifeCycleComponent
         log("instantTryLock", datasetId.getId(), entityHashValue, lockMode, txnContext);
         stats.instantTryLock();
 
-        final int dsId = datasetId.getId();
         final int jobId = txnContext.getJobId().getId();
-        final ResourceGroup group = table.get(dsId, entityHashValue);
+        final ResourceGroup group = table.get(datasetId.getId(), entityHashValue);
         if (group.firstResourceIndex.get() == NILL) {
             validateJob(txnContext);
             // if we do not have a resource in the group, we know that the
@@ -438,7 +438,7 @@ public class ConcurrentLockManager implements ILockManager, ILifeCycleComponent
         try {
             validateJob(txnContext);
 
-            final long resSlot = findResourceInGroup(group, dsId, entityHashValue);
+            final long resSlot = findResourceInGroup(group, datasetId.getId(), entityHashValue);
             if (resSlot < 0) {
                 // if we don't find the resource, there are no locks on it.
                 return true;
@@ -470,8 +470,7 @@ public class ConcurrentLockManager implements ILockManager, ILifeCycleComponent
         final int jobId = txnContext.getJobId().getId();
         final long jobSlot = jobId2JobSlotMap.get(jobId);
 
-        final int dsId = datasetId.getId();
-        unlock(dsId, entityHashValue, lockMode, jobSlot);
+        unlock(datasetId.getId(), entityHashValue, lockMode, jobSlot);
     }
 
     private void unlock(int dsId, int entityHashValue, byte lockMode, long jobSlot) throws ACIDException {
@@ -487,14 +486,16 @@ public class ConcurrentLockManager implements ILockManager, ILifeCycleComponent
                 throw new IllegalStateException("resource (" + dsId + ",  " + entityHashValue + ") not found");
             }
 
-            if (CHECK_CONSISTENCY)
+            if (CHECK_CONSISTENCY) {
                 assertLocksCanBefoundInJobQueue();
+            }
 
             long holder = removeLastHolder(resource, jobSlot, lockMode);
 
             // deallocate request
-            if (DEBUG_MODE)
+            if (DEBUG_MODE) {
                 LOGGER.finer("del req slot " + TypeUtil.Global.toString(holder));
+            }
             reqArenaMgr.deallocate(holder);
             // deallocate resource or fix max lock mode
             if (resourceNotUsed(resource)) {
@@ -507,8 +508,9 @@ public class ConcurrentLockManager implements ILockManager, ILifeCycleComponent
                     }
                     resArenaMgr.setNext(prev, resArenaMgr.getNext(resource));
                 }
-                if (DEBUG_MODE)
+                if (DEBUG_MODE) {
                     LOGGER.finer("del res slot " + TypeUtil.Global.toString(resource));
+                }
                 resArenaMgr.deallocate(resource);
             } else {
                 final int oldMaxMode = resArenaMgr.getMaxMode(resource);
@@ -551,8 +553,9 @@ public class ConcurrentLockManager implements ILockManager, ILifeCycleComponent
                 holder = jobArenaMgr.getLastHolder(jobSlot);
             }
         }
-        if (DEBUG_MODE)
+        if (DEBUG_MODE) {
             LOGGER.finer("del job slot " + TypeUtil.Global.toString(jobSlot));
+        }
         jobArenaMgr.deallocate(jobSlot);
         jobId2JobSlotMap.remove(jobId);
         stats.logCounters(LOGGER, Level.FINE, true);
@@ -562,16 +565,18 @@ public class ConcurrentLockManager implements ILockManager, ILifeCycleComponent
         Long jobSlot = jobId2JobSlotMap.get(jobId);
         if (jobSlot == null) {
             jobSlot = new Long(jobArenaMgr.allocate());
-            if (DEBUG_MODE)
+            if (DEBUG_MODE) {
                 LOGGER.finer("new job slot " + TypeUtil.Global.toString(jobSlot) + " (" + jobId + ")");
+            }
             jobArenaMgr.setJobId(jobSlot, jobId);
             Long oldSlot = jobId2JobSlotMap.putIfAbsent(jobId, jobSlot);
             if (oldSlot != null) {
                 // if another thread allocated a slot for this jobThreadId between
                 // get(..) and putIfAbsent(..), we'll use that slot and
                 // deallocate the one we allocated
-                if (DEBUG_MODE)
+                if (DEBUG_MODE) {
                     LOGGER.finer("del job slot " + TypeUtil.Global.toString(jobSlot) + " due to conflict");
+                }
                 jobArenaMgr.deallocate(jobSlot);
                 jobSlot = oldSlot;
             }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/04075533/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java
index 424e800..efcff05 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java
@@ -62,8 +62,8 @@ public class LogBuffer implements ILogBuffer {
     private final LinkedBlockingQueue<ILogRecord> remoteJobsQ;
     private FileChannel fileChannel;
     private boolean stop;
-    private final DatasetId reusableDsId;
     private final JobId reusableJobId;
+    private final DatasetId reusableDatasetId;
 
     public LogBuffer(ITransactionSubsystem txnSubsystem, int logPageSize, MutableLong flushLSN) {
         this.txnSubsystem = txnSubsystem;
@@ -80,8 +80,8 @@ public class LogBuffer implements ILogBuffer {
         syncCommitQ = new LinkedBlockingQueue<>(logPageSize / ILogRecord.JOB_TERMINATE_LOG_SIZE);
         flushQ = new LinkedBlockingQueue<>();
         remoteJobsQ = new LinkedBlockingQueue<>();
-        reusableDsId = new DatasetId(-1);
         reusableJobId = new JobId(-1);
+        reusableDatasetId = new DatasetId(-1);
     }
 
     ////////////////////////////////////
@@ -255,18 +255,13 @@ public class LogBuffer implements ILogBuffer {
             LogRecord logRecord = logBufferTailReader.next();
             while (logRecord != null) {
                 if (logRecord.getLogSource() == LogSource.LOCAL) {
-                    if (logRecord.getLogType() == LogType.ENTITY_COMMIT
-                            || logRecord.getLogType() == LogType.UPSERT_ENTITY_COMMIT) {
+                    if (logRecord.getLogType() == LogType.ENTITY_COMMIT) {
                         reusableJobId.setId(logRecord.getJobId());
+                        reusableDatasetId.setId(logRecord.getDatasetId());
                         txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(reusableJobId, false);
-                        reusableDsId.setId(logRecord.getDatasetId());
-                        txnSubsystem.getLockManager().unlock(reusableDsId, logRecord.getPKHashValue(), LockMode.ANY,
-                                txnCtx);
+                        txnSubsystem.getLockManager().unlock(reusableDatasetId, logRecord.getPKHashValue(),
+                                LockMode.ANY, txnCtx);
                         txnCtx.notifyOptracker(false);
-                        if (logRecord.getLogType() == LogType.UPSERT_ENTITY_COMMIT) {
-                            // since this operation consisted of delete and insert, we need to notify the optracker twice
-                            txnCtx.notifyOptracker(false);
-                        }
                         if (TransactionUtil.PROFILE_MODE) {
                             txnSubsystem.incrementEntityCommitCount();
                         }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/04075533/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java
index d885f00..e3ad3dd 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java
@@ -48,7 +48,6 @@ public class LogManagerWithReplication extends LogManager {
         if (shouldReplicate) {
             switch (logRecord.getLogType()) {
                 case LogType.ENTITY_COMMIT:
-                case LogType.UPSERT_ENTITY_COMMIT:
                 case LogType.UPDATE:
                 case LogType.FLUSH:
                     shouldReplicate = replicationStrategy.isMatch(logRecord.getDatasetId());

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/04075533/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/TxnId.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/TxnId.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/TxnId.java
index bd4a49a..9cb54af 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/TxnId.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/TxnId.java
@@ -20,7 +20,7 @@ package org.apache.asterix.transaction.management.service.recovery;
 
 import java.nio.ByteBuffer;
 
-import org.apache.asterix.common.transactions.DatasetId;
+import org.apache.asterix.common.transactions.ILogRecord;
 import org.apache.asterix.common.transactions.JobId;
 import org.apache.asterix.common.transactions.LogRecord;
 import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
@@ -165,7 +165,7 @@ public class TxnId {
 
     public int getCurrentSize() {
         //job id, dataset id, pkHashValue, arraySize, isByteArrayPKValue
-        int size = JobId.BYTES + DatasetId.BYTES + LogRecord.PKHASH_LEN + LogRecord.PKSZ_LEN + Byte.BYTES;
+        int size = JobId.BYTES + ILogRecord.DS_LEN + LogRecord.PKHASH_LEN + LogRecord.PKSZ_LEN + Byte.BYTES;
         //byte arraySize
         if (isByteArrayPKValue && byteArrayPKValue != null) {
             size += byteArrayPKValue.length;


[3/3] asterixdb git commit: Fix transaction logs and optimize upserts

Posted by am...@apache.org.
Fix transaction logs and optimize upserts

Previously, Transaction logs didn't log previous image
which made it difficult to undo aborted transactions
correctly. This change fixes that by always recording
previous image.

In addition, Upsert was performed as a delete if found
followed by an insert with two logs. This change makes
it a single operation with a single transaction log.

Change-Id: Ice5296267033cd7debe76894c864c6411f761d83
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1554
Tested-by: Jenkins <je...@fulliautomatix.ics.uci.edu>
BAD: Jenkins <je...@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <je...@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <hu...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/04075533
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/04075533
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/04075533

Branch: refs/heads/master
Commit: 04075533ae042fa2741a190f37625c308d90a9d2
Parents: 89328a8
Author: Abdullah Alamoudi <ba...@gmail.com>
Authored: Mon Mar 13 22:44:59 2017 -0700
Committer: abdullah alamoudi <ba...@gmail.com>
Committed: Tue Mar 14 10:37:29 2017 -0700

----------------------------------------------------------------------
 .../algebra/operators/CommitOperator.java       | 15 +----
 .../operators/physical/CommitPOperator.java     | 11 +---
 .../rules/SetupCommitExtensionOpRule.java       | 38 +----------
 .../apache/asterix/app/nc/RecoveryManager.java  | 29 +++++----
 .../app/bootstrap/TestNodeController.java       | 38 +++++------
 .../asterix/test/dataflow/LogMarkerTest.java    | 13 ++--
 .../asterix/test/logging/CheckpointingTest.java | 17 ++---
 .../transactions/AbstractOperationCallback.java |  4 +-
 .../asterix/common/transactions/DatasetId.java  | 19 ++----
 .../asterix/common/transactions/ILogRecord.java |  1 -
 .../transactions/ITransactionManager.java       | 14 ++--
 .../common/transactions/ImmutableDatasetId.java | 33 ++++++++++
 .../asterix/common/transactions/LogRecord.java  |  8 +--
 .../asterix/common/transactions/LogType.java    |  4 --
 asterixdb/asterix-installer/pom.xml             |  1 -
 .../src/main/assembly/binary-assembly.xml       |  1 -
 .../apache/asterix/metadata/MetadataNode.java   | 25 ++++----
 .../asterix/metadata/api/IMetadataIndex.java    |  4 +-
 .../metadata/bootstrap/MetadataBootstrap.java   |  6 +-
 .../metadata/bootstrap/MetadataIndex.java       |  7 +-
 .../metadata/declared/MetadataProvider.java     | 30 +++++----
 .../asterix/metadata/entities/Dataset.java      | 26 ++++----
 .../management/ReplicationChannel.java          |  1 -
 .../job/listener/JobEventListenerFactory.java   |  2 +-
 ...tiTransactionJobletEventListenerFactory.java |  2 +-
 .../LSMPrimaryUpsertOperatorNodePushable.java   | 39 +++++++-----
 .../LSMSecondaryUpsertOperatorNodePushable.java | 14 +++-
 .../std/FlushDatasetOperatorDescriptor.java     |  3 +-
 asterixdb/asterix-transactions/pom.xml          |  5 --
 ...tractIndexModificationOperationCallback.java | 66 ++++++++++++++-----
 .../LockThenSearchOperationCallback.java        |  4 +-
 .../LockThenSearchOperationCallbackFactory.java |  3 +-
 ...maryIndexInstantSearchOperationCallback.java |  5 +-
 ...exInstantSearchOperationCallbackFactory.java |  3 +-
 ...imaryIndexModificationOperationCallback.java | 21 ++----
 ...dexModificationOperationCallbackFactory.java | 16 ++---
 .../PrimaryIndexSearchOperationCallback.java    |  3 +-
 ...maryIndexSearchOperationCallbackFactory.java | 13 ++--
 ...ndaryIndexModificationOperationCallback.java | 17 ++---
 ...dexModificationOperationCallbackFactory.java | 20 +++---
 .../SecondaryIndexSearchOperationCallback.java  |  3 +-
 ...tasetIndexModificationOperationCallback.java | 10 ++-
 ...dexModificationOperationCallbackFactory.java | 17 ++---
 ...dexModificationOperationCallbackFactory.java | 17 ++---
 .../opcallbacks/UpsertOperationCallback.java    | 14 ++--
 .../UpsertOperationCallbackFactory.java         | 20 +++---
 .../runtime/CommitRuntimeFactory.java           | 10 +--
 .../management/runtime/UpsertCommitRuntime.java | 54 ----------------
 .../service/locking/ConcurrentLockManager.java  | 67 +++++++++++---------
 .../management/service/logging/LogBuffer.java   | 17 ++---
 .../logging/LogManagerWithReplication.java      |  1 -
 .../management/service/recovery/TxnId.java      |  4 +-
 .../management/service/locking/Request.java     | 14 ++++
 asterixdb/asterix-yarn/pom.xml                  |  2 -
 .../AbstractOneInputOneOutputPushRuntime.java   |  1 -
 .../api/IModificationOperationCallback.java     | 16 +----
 .../am/common/impls/NoOpOperationCallback.java  |  5 --
 .../storage/am/lsm/btree/impls/LSMBTree.java    |  2 +-
 .../am/lsm/common/api/ILSMIndexAccessor.java    |  9 +++
 .../lsm/common/impls/LSMTreeIndexAccessor.java  |  6 ++
 .../impls/LSMInvertedIndexAccessor.java         |  5 ++
 ...stractModificationOperationCallbackTest.java |  5 --
 .../am/common/TestOperationCallback.java        |  6 --
 63 files changed, 419 insertions(+), 467 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/04075533/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/CommitOperator.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/CommitOperator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/CommitOperator.java
index 1ee130c..6578c9c 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/CommitOperator.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/CommitOperator.java
@@ -32,24 +32,20 @@ import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionRef
 public class CommitOperator extends AbstractDelegatedLogicalOperator {
 
     private List<LogicalVariable> primaryKeyLogicalVars;
-    private LogicalVariable upsertVar;
     private boolean isSink;
 
     public CommitOperator(boolean isSink) {
         this.isSink = isSink;
-        this.upsertVar = null;
         primaryKeyLogicalVars = new ArrayList<>();
     }
 
-    public CommitOperator(List<LogicalVariable> primaryKeyLogicalVars, LogicalVariable upsertVar, boolean isSink) {
+    public CommitOperator(List<LogicalVariable> primaryKeyLogicalVars, boolean isSink) {
         this.primaryKeyLogicalVars = primaryKeyLogicalVars;
-        this.upsertVar = upsertVar;
         this.isSink = isSink;
     }
 
     @Override
     public boolean isMap() {
-        // TODO Auto-generated method stub
         return false;
     }
 
@@ -62,20 +58,18 @@ public class CommitOperator extends AbstractDelegatedLogicalOperator {
     }
 
     //Provided for Extensions but not used by core
-    public void setVars(List<LogicalVariable> primaryKeyLogicalVars, LogicalVariable upsertVar) {
+    public void setVars(List<LogicalVariable> primaryKeyLogicalVars) {
         this.primaryKeyLogicalVars = primaryKeyLogicalVars;
-        this.upsertVar = upsertVar;
     }
 
     @Override
     public IOperatorDelegate newInstance() {
-        return new CommitOperator(primaryKeyLogicalVars, upsertVar, isSink);
+        return new CommitOperator(primaryKeyLogicalVars, isSink);
     }
 
     @Override
     public boolean acceptExpressionTransform(ILogicalExpressionReferenceTransform transform)
             throws AlgebricksException {
-        // TODO Auto-generated method stub
         return false;
     }
 
@@ -87,9 +81,6 @@ public class CommitOperator extends AbstractDelegatedLogicalOperator {
     @Override
     public void getUsedVariables(Collection<LogicalVariable> usedVars) {
         usedVars.addAll(primaryKeyLogicalVars);
-        if (upsertVar != null) {
-            usedVars.add(upsertVar);
-        }
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/04075533/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitPOperator.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitPOperator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitPOperator.java
index d0cee55..20c69c4 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitPOperator.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitPOperator.java
@@ -46,15 +46,12 @@ public class CommitPOperator extends AbstractPhysicalOperator {
     private final List<LogicalVariable> primaryKeyLogicalVars;
     private final JobId jobId;
     private final Dataset dataset;
-    private final LogicalVariable upsertVar;
     private final boolean isSink;
 
-    public CommitPOperator(JobId jobId, Dataset dataset, List<LogicalVariable> primaryKeyLogicalVars,
-            LogicalVariable upsertVar, boolean isSink) {
+    public CommitPOperator(JobId jobId, Dataset dataset, List<LogicalVariable> primaryKeyLogicalVars, boolean isSink) {
         this.jobId = jobId;
         this.dataset = dataset;
         this.primaryKeyLogicalVars = primaryKeyLogicalVars;
-        this.upsertVar = upsertVar;
         this.isSink = isSink;
     }
 
@@ -98,12 +95,8 @@ public class CommitPOperator extends AbstractPhysicalOperator {
         for (int i = 0; i < splitsForDataset.length; i++) {
             datasetPartitions[i] = i;
         }
-        int upsertVarIdx = -1;
-        if (upsertVar != null) {
-            upsertVarIdx = inputSchemas[0].findVariable(upsertVar);
-        }
         IPushRuntimeFactory runtime = dataset.getCommitRuntimeFactory(jobId, primaryKeyFields, metadataProvider,
-                upsertVarIdx, datasetPartitions, isSink);
+                datasetPartitions, isSink);
         builder.contributeMicroOperator(op, runtime, recDesc);
         ILogicalOperator src = op.getInputs().get(0).getValue();
         builder.contributeGraphEdge(src, 0, op, 0);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/04075533/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetupCommitExtensionOpRule.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetupCommitExtensionOpRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetupCommitExtensionOpRule.java
index 9b442ae..18e5f5e 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetupCommitExtensionOpRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetupCommitExtensionOpRule.java
@@ -24,28 +24,21 @@ import java.util.List;
 import org.apache.asterix.algebra.operators.CommitOperator;
 import org.apache.asterix.algebra.operators.physical.CommitPOperator;
 import org.apache.asterix.common.transactions.JobId;
-import org.apache.asterix.lang.common.util.FunctionUtil;
 import org.apache.asterix.metadata.declared.DatasetDataSource;
 import org.apache.asterix.metadata.declared.MetadataProvider;
 import org.apache.asterix.metadata.entities.Dataset;
-import org.apache.asterix.om.functions.BuiltinFunctions;
 import org.apache.commons.lang3.mutable.Mutable;
-import org.apache.commons.lang3.mutable.MutableObject;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
 import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
-import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
-import org.apache.hyracks.algebricks.core.algebra.expressions.ScalarFunctionCallExpression;
 import org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.DelegateOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator.Kind;
 import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
 
 public class SetupCommitExtensionOpRule implements IAlgebraicRewriteRule {
@@ -73,7 +66,6 @@ public class SetupCommitExtensionOpRule implements IAlgebraicRewriteRule {
         List<Mutable<ILogicalExpression>> primaryKeyExprs = null;
         Dataset dataset = null;
         AbstractLogicalOperator descendantOp = (AbstractLogicalOperator) eOp.getInputs().get(0).getValue();
-        LogicalVariable upsertVar = null;
         while (descendantOp != null) {
             if (descendantOp.getOperatorTag() == LogicalOperatorTag.INDEX_INSERT_DELETE_UPSERT) {
                 IndexInsertDeleteUpsertOperator operator = (IndexInsertDeleteUpsertOperator) descendantOp;
@@ -87,32 +79,6 @@ public class SetupCommitExtensionOpRule implements IAlgebraicRewriteRule {
                 if (!insertDeleteUpsertOperator.isBulkload()) {
                     primaryKeyExprs = insertDeleteUpsertOperator.getPrimaryKeyExpressions();
                     dataset = ((DatasetDataSource) insertDeleteUpsertOperator.getDataSource()).getDataset();
-                    if (insertDeleteUpsertOperator.getOperation() == Kind.UPSERT) {
-                        //we need to add a function that checks if previous record was found
-                        upsertVar = context.newVar();
-                        AbstractFunctionCallExpression orFunc =
-                                new ScalarFunctionCallExpression(FunctionUtil.getFunctionInfo(BuiltinFunctions.OR));
-                        // is new value missing? -> this means that the expected operation is delete
-                        AbstractFunctionCallExpression isNewMissingFunc = new ScalarFunctionCallExpression(
-                                FunctionUtil.getFunctionInfo(BuiltinFunctions.IS_MISSING));
-                        isNewMissingFunc.getArguments().add(insertDeleteUpsertOperator.getPayloadExpression());
-                        AbstractFunctionCallExpression isPrevMissingFunc = new ScalarFunctionCallExpression(
-                                FunctionUtil.getFunctionInfo(BuiltinFunctions.IS_MISSING));
-                        // argument is the previous record
-                        isPrevMissingFunc.getArguments().add(new MutableObject<ILogicalExpression>(
-                                new VariableReferenceExpression(insertDeleteUpsertOperator.getBeforeOpRecordVar())));
-                        orFunc.getArguments().add(new MutableObject<ILogicalExpression>(isPrevMissingFunc));
-                        orFunc.getArguments().add(new MutableObject<ILogicalExpression>(isNewMissingFunc));
-
-                        // AssignOperator puts in the cast var the casted record
-                        AssignOperator upsertFlagAssign =
-                                new AssignOperator(upsertVar, new MutableObject<ILogicalExpression>(orFunc));
-                        // Connect the current top of the plan to the cast operator
-                        upsertFlagAssign.getInputs().add(new MutableObject<>(eOp.getInputs().get(0).getValue()));
-                        eOp.getInputs().clear();
-                        eOp.getInputs().add(new MutableObject<ILogicalOperator>(upsertFlagAssign));
-                        context.computeAndSetTypeEnvironmentForOperator(upsertFlagAssign);
-                    }
                     break;
                 }
             }
@@ -138,9 +104,9 @@ public class SetupCommitExtensionOpRule implements IAlgebraicRewriteRule {
         JobId jobId = mp.getJobId();
 
         //create the logical and physical operator
-        CommitOperator commitOperator = new CommitOperator(primaryKeyLogicalVars, upsertVar, isSink);
+        CommitOperator commitOperator = new CommitOperator(primaryKeyLogicalVars, isSink);
         CommitPOperator commitPOperator =
-                new CommitPOperator(jobId, dataset, primaryKeyLogicalVars, upsertVar, isSink);
+                new CommitPOperator(jobId, dataset, primaryKeyLogicalVars, isSink);
         commitOperator.setPhysicalOperator(commitPOperator);
 
         //create ExtensionOperator and put the commitOperator in it.

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/04075533/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
index 6e4e4cb..07f341d 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
@@ -57,6 +57,7 @@ import org.apache.asterix.common.transactions.ITransactionContext;
 import org.apache.asterix.common.transactions.ITransactionSubsystem;
 import org.apache.asterix.common.transactions.LogType;
 import org.apache.asterix.common.transactions.Resource;
+import org.apache.asterix.transaction.management.opcallbacks.AbstractIndexModificationOperationCallback;
 import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
 import org.apache.asterix.transaction.management.service.logging.LogManager;
 import org.apache.asterix.transaction.management.service.recovery.AbstractCheckpointManager;
@@ -68,7 +69,6 @@ import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.lifecycle.ILifeCycleComponent;
 import org.apache.hyracks.storage.am.common.api.IIndex;
 import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback;
-import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
 import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex;
@@ -239,7 +239,6 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent {
                     jobCommitLogCount++;
                     break;
                 case LogType.ENTITY_COMMIT:
-                case LogType.UPSERT_ENTITY_COMMIT:
                     if (partitions.contains(logRecord.getResourcePartition())) {
                         analyzeEntityCommitLog(logRecord);
                         entityCommitLogCount++;
@@ -405,7 +404,6 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent {
                     case LogType.ENTITY_COMMIT:
                     case LogType.ABORT:
                     case LogType.FLUSH:
-                    case LogType.UPSERT_ENTITY_COMMIT:
                     case LogType.WAIT:
                     case LogType.MARKER:
                         //do nothing
@@ -598,13 +596,12 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent {
                         }
                         break;
                     case LogType.ENTITY_COMMIT:
-                    case LogType.UPSERT_ENTITY_COMMIT:
                         if (activePartitions.contains(logRecord.getResourcePartition())) {
                             jobLoserEntity2LSNsMap.remove(tempKeyTxnId);
                             entityCommitLogCount++;
                             if (IS_DEBUG_MODE) {
-                                LOGGER.info(Thread.currentThread().getId() + "======> entity_commit[" + currentLSN
-                                        + "]" + tempKeyTxnId);
+                                LOGGER.info(Thread.currentThread().getId() + "======> entity_commit[" + currentLSN + "]"
+                                        + tempKeyTxnId);
                             }
                         }
                         break;
@@ -686,10 +683,17 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent {
                     (ILSMIndex) datasetLifecycleManager.getIndex(logRecord.getDatasetId(), logRecord.getResourceId());
             ILSMIndexAccessor indexAccessor =
                     index.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
-            if (logRecord.getNewOp() == IndexOperation.INSERT.ordinal()) {
+            if (logRecord.getNewOp() == AbstractIndexModificationOperationCallback.INSERT_BYTE) {
                 indexAccessor.forceDelete(logRecord.getNewValue());
-            } else if (logRecord.getNewOp() == IndexOperation.DELETE.ordinal()) {
-                indexAccessor.forceInsert(logRecord.getNewValue());
+            } else if (logRecord.getNewOp() == AbstractIndexModificationOperationCallback.DELETE_BYTE) {
+                indexAccessor.forceInsert(logRecord.getOldValue());
+            } else if (logRecord.getNewOp() == AbstractIndexModificationOperationCallback.UPSERT_BYTE) {
+                // undo, upsert the old value if found, otherwise, physical delete
+                if (logRecord.getOldValue() == null) {
+                    indexAccessor.forcePhysicalDelete(logRecord.getNewValue());
+                } else {
+                    indexAccessor.forceUpsert(logRecord.getOldValue());
+                }
             } else {
                 throw new IllegalStateException("Unsupported OperationType: " + logRecord.getNewOp());
             }
@@ -705,10 +709,13 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent {
             ILSMIndex index = (ILSMIndex) datasetLifecycleManager.getIndex(datasetId, resourceId);
             ILSMIndexAccessor indexAccessor =
                     index.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
-            if (logRecord.getNewOp() == IndexOperation.INSERT.ordinal()) {
+            if (logRecord.getNewOp() == AbstractIndexModificationOperationCallback.INSERT_BYTE) {
                 indexAccessor.forceInsert(logRecord.getNewValue());
-            } else if (logRecord.getNewOp() == IndexOperation.DELETE.ordinal()) {
+            } else if (logRecord.getNewOp() == AbstractIndexModificationOperationCallback.DELETE_BYTE) {
                 indexAccessor.forceDelete(logRecord.getNewValue());
+            } else if (logRecord.getNewOp() == AbstractIndexModificationOperationCallback.UPSERT_BYTE) {
+                // redo, upsert the new value
+                indexAccessor.forceUpsert(logRecord.getNewValue());
             } else {
                 throw new IllegalStateException("Unsupported OperationType: " + logRecord.getNewOp());
             }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/04075533/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
index b7026ef..53f5f62 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
@@ -53,11 +53,13 @@ import org.apache.asterix.om.types.IAType;
 import org.apache.asterix.runtime.formats.NonTaggedDataFormat;
 import org.apache.asterix.runtime.utils.RuntimeComponentsProvider;
 import org.apache.asterix.test.runtime.ExecutionTestUtil;
+import org.apache.asterix.transaction.management.opcallbacks.AbstractIndexModificationOperationCallback.Operation;
 import org.apache.asterix.transaction.management.opcallbacks.PrimaryIndexModificationOperationCallbackFactory;
 import org.apache.asterix.transaction.management.resource.LSMBTreeLocalResourceMetadataFactory;
 import org.apache.asterix.transaction.management.resource.PersistentLocalResourceFactoryProvider;
 import org.apache.asterix.transaction.management.runtime.CommitRuntime;
 import org.apache.asterix.transaction.management.service.logging.LogReader;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.runtime.base.IPushRuntime;
 import org.apache.hyracks.algebricks.runtime.operators.std.EmptyTupleSourceRuntimeFactory;
@@ -159,18 +161,20 @@ public class TestNodeController {
         return new org.apache.asterix.common.transactions.JobId((int) jobId.getId());
     }
 
-    public LSMInsertDeleteOperatorNodePushable getInsertPipeline(IHyracksTaskContext ctx, Dataset dataset,
-            IAType[] primaryKeyTypes, ARecordType recordType, ARecordType metaType,
-            ILSMMergePolicyFactory mergePolicyFactory, Map<String, String> mergePolicyProperties, int[] filterFields,
-            int[] primaryKeyIndexes, List<Integer> primaryKeyIndicators,
-            StorageComponentProvider storageComponentProvider) throws AlgebricksException, HyracksDataException {
+    public Pair<LSMInsertDeleteOperatorNodePushable, CommitRuntime> getInsertPipeline(IHyracksTaskContext ctx,
+            Dataset dataset, IAType[] primaryKeyTypes,
+            ARecordType recordType, ARecordType metaType, ILSMMergePolicyFactory mergePolicyFactory,
+            Map<String, String> mergePolicyProperties, int[] filterFields, int[] primaryKeyIndexes,
+            List<Integer> primaryKeyIndicators, StorageComponentProvider storageComponentProvider)
+            throws AlgebricksException, HyracksDataException {
         PrimaryIndexInfo primaryIndexInfo = new PrimaryIndexInfo(dataset, primaryKeyTypes, recordType, metaType,
                 mergePolicyFactory, mergePolicyProperties, filterFields, primaryKeyIndexes, primaryKeyIndicators,
                 storageComponentProvider);
         IndexOperation op = IndexOperation.INSERT;
         IModificationOperationCallbackFactory modOpCallbackFactory =
                 new PrimaryIndexModificationOperationCallbackFactory(getTxnJobId(), dataset.getDatasetId(),
-                        primaryIndexInfo.primaryKeyIndexes, TXN_SUBSYSTEM_PROVIDER, op, ResourceType.LSM_BTREE, true);
+                        primaryIndexInfo.primaryKeyIndexes, TXN_SUBSYSTEM_PROVIDER, Operation.get(op),
+                        ResourceType.LSM_BTREE);
         LSMTreeInsertDeleteOperatorDescriptor indexOpDesc =
                 getInsertOpratorDesc(primaryIndexInfo, modOpCallbackFactory);
         IIndexDataflowHelperFactory dataflowHelperFactory =
@@ -183,7 +187,7 @@ public class TestNodeController {
                 primaryIndexInfo.primaryKeyIndexes, false, true, PARTITION, true);
         insertOp.setOutputFrameWriter(0, commitOp, primaryIndexInfo.rDesc);
         commitOp.setInputRecordDescriptor(0, primaryIndexInfo.rDesc);
-        return insertOp;
+        return Pair.of(insertOp, commitOp);
     }
 
     public IPushRuntime getFullScanPipeline(IFrameWriter countOp, IHyracksTaskContext ctx, Dataset dataset,
@@ -298,8 +302,7 @@ public class TestNodeController {
         Index index = primaryIndexInfo.getIndex();
         MetadataProvider mdProvider = new MetadataProvider(dataverse, storageComponentProvider);
         return dataset.getIndexDataflowHelperFactory(mdProvider, index, primaryIndexInfo.recordType,
-                primaryIndexInfo.metaType, primaryIndexInfo.mergePolicyFactory,
-                primaryIndexInfo.mergePolicyProperties);
+                primaryIndexInfo.metaType, primaryIndexInfo.mergePolicyFactory, primaryIndexInfo.mergePolicyProperties);
     }
 
     public IIndexDataflowHelper getPrimaryIndexDataflowHelper(Dataset dataset, IAType[] primaryKeyTypes,
@@ -431,11 +434,10 @@ public class TestNodeController {
         private Index index;
         private IStorageComponentProvider storageComponentProvider;
 
-        public PrimaryIndexInfo(Dataset dataset, IAType[] primaryKeyTypes, ARecordType recordType,
-                ARecordType metaType, ILSMMergePolicyFactory mergePolicyFactory,
-                Map<String, String> mergePolicyProperties, int[] filterFields, int[] primaryKeyIndexes,
-                List<Integer> primaryKeyIndicators, IStorageComponentProvider storageComponentProvider)
-                throws AlgebricksException {
+        public PrimaryIndexInfo(Dataset dataset, IAType[] primaryKeyTypes, ARecordType recordType, ARecordType metaType,
+                ILSMMergePolicyFactory mergePolicyFactory, Map<String, String> mergePolicyProperties,
+                int[] filterFields, int[] primaryKeyIndexes, List<Integer> primaryKeyIndicators,
+                IStorageComponentProvider storageComponentProvider) throws AlgebricksException {
             this.storageComponentProvider = storageComponentProvider;
             this.dataset = dataset;
             this.primaryKeyTypes = primaryKeyTypes;
@@ -474,10 +476,10 @@ public class TestNodeController {
             index = new Index(dataset.getDataverseName(), dataset.getDatasetName(), dataset.getDatasetName(),
                     IndexType.BTREE, keyFieldNames, keyFieldSourceIndicators, keyFieldTypes, false, true,
                     MetadataUtil.PENDING_NO_OP);
-            localResourceFactoryProvider = getPrimaryIndexLocalResourceMetadataProvider(storageComponentProvider,
-                    index, dataset, primaryIndexTypeTraits, primaryIndexComparatorFactories,
-                    primaryIndexBloomFilterKeyFields, mergePolicyFactory, mergePolicyProperties, filterTypeTraits,
-                    filterCmpFactories, btreeFields, filterFields, dataset.getIndexOperationTrackerFactory(index));
+            localResourceFactoryProvider = getPrimaryIndexLocalResourceMetadataProvider(storageComponentProvider, index,
+                    dataset, primaryIndexTypeTraits, primaryIndexComparatorFactories, primaryIndexBloomFilterKeyFields,
+                    mergePolicyFactory, mergePolicyProperties, filterTypeTraits, filterCmpFactories, btreeFields,
+                    filterFields, dataset.getIndexOperationTrackerFactory(index));
         }
 
         public Index getIndex() {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/04075533/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LogMarkerTest.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LogMarkerTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LogMarkerTest.java
index 1467dbf..1bcc88a 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LogMarkerTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LogMarkerTest.java
@@ -115,9 +115,10 @@ public class LogMarkerTest {
                 IHyracksTaskContext ctx = nc.createTestContext(true);
                 nc.newJobId();
                 ITransactionContext txnCtx = nc.getTransactionManager().getTransactionContext(nc.getTxnJobId(), true);
-                LSMInsertDeleteOperatorNodePushable insertOp = nc.getInsertPipeline(ctx, dataset, KEY_TYPES,
-                        RECORD_TYPE, META_TYPE, new NoMergePolicyFactory(), null, null, KEY_INDEXES,
-                        KEY_INDICATORS_LIST, storageManager);
+                LSMInsertDeleteOperatorNodePushable insertOp = nc
+                        .getInsertPipeline(ctx, dataset, KEY_TYPES, RECORD_TYPE, META_TYPE, new NoMergePolicyFactory(),
+                                null, null, KEY_INDEXES, KEY_INDICATORS_LIST, storageManager)
+                        .getLeft();
                 insertOp.open();
                 TupleGenerator tupleGenerator = new TupleGenerator(RECORD_TYPE, META_TYPE, KEY_INDEXES, KEY_INDICATORS,
                         RECORD_GEN_FUNCTION, UNIQUE_RECORD_FIELDS, META_GEN_FUNCTION, UNIQUE_META_FIELDS);
@@ -142,9 +143,9 @@ public class LogMarkerTest {
                     tupleAppender.write(insertOp, true);
                 }
                 insertOp.close();
-                nc.getTransactionManager().completedTransaction(txnCtx, new DatasetId(-1), -1, true);
-                IIndexDataflowHelper dataflowHelper = nc.getPrimaryIndexDataflowHelper(dataset, KEY_TYPES,
-                        RECORD_TYPE, META_TYPE, new NoMergePolicyFactory(), null, null, storageManager, KEY_INDEXES,
+                nc.getTransactionManager().completedTransaction(txnCtx, DatasetId.NULL, -1, true);
+                IIndexDataflowHelper dataflowHelper = nc.getPrimaryIndexDataflowHelper(dataset, KEY_TYPES, RECORD_TYPE,
+                        META_TYPE, new NoMergePolicyFactory(), null, null, storageManager, KEY_INDEXES,
                         KEY_INDICATORS_LIST);
                 dataflowHelper.open();
                 LSMBTree btree = (LSMBTree) dataflowHelper.getIndexInstance();

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/04075533/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java
index 34bb9cf..35c3c42 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java
@@ -62,14 +62,14 @@ public class CheckpointingTest {
 
     private static final String DEFAULT_TEST_CONFIG_FILE_NAME = "asterix-build-configuration.xml";
     private static final String TEST_CONFIG_FILE_NAME = "asterix-test-configuration.xml";
-    private static final String TEST_CONFIG_PATH = System.getProperty("user.dir") + File.separator + "target"
-            + File.separator + "config";
+    private static final String TEST_CONFIG_PATH =
+            System.getProperty("user.dir") + File.separator + "target" + File.separator + "config";
     private static final String TEST_CONFIG_FILE_PATH = TEST_CONFIG_PATH + File.separator + TEST_CONFIG_FILE_NAME;
     private static final IAType[] KEY_TYPES = { BuiltinType.AINT32 };
     private static final ARecordType RECORD_TYPE = new ARecordType("TestRecordType", new String[] { "key", "value" },
             new IAType[] { BuiltinType.AINT32, BuiltinType.AINT64 }, false);
-    private static final GenerationFunction[] RECORD_GEN_FUNCTION = { GenerationFunction.DETERMINISTIC,
-            GenerationFunction.DETERMINISTIC };
+    private static final GenerationFunction[] RECORD_GEN_FUNCTION =
+            { GenerationFunction.DETERMINISTIC, GenerationFunction.DETERMINISTIC };
     private static final boolean[] UNIQUE_RECORD_FIELDS = { true, false };
     private static final ARecordType META_TYPE = null;
     private static final GenerationFunction[] META_GEN_FUNCTION = null;
@@ -123,9 +123,10 @@ public class CheckpointingTest {
                 nc.newJobId();
                 ITransactionContext txnCtx = nc.getTransactionManager().getTransactionContext(nc.getTxnJobId(), true);
                 // Prepare insert operation
-                LSMInsertDeleteOperatorNodePushable insertOp = nc.getInsertPipeline(ctx, dataset, KEY_TYPES,
-                        RECORD_TYPE, META_TYPE, new NoMergePolicyFactory(), null, null, KEY_INDEXES, KEY_INDICATOR_LIST,
-                        storageManager);
+                LSMInsertDeleteOperatorNodePushable insertOp = nc
+                        .getInsertPipeline(ctx, dataset, KEY_TYPES, RECORD_TYPE, META_TYPE, new NoMergePolicyFactory(),
+                                null, null, KEY_INDEXES, KEY_INDICATOR_LIST, storageManager)
+                        .getLeft();
                 insertOp.open();
                 TupleGenerator tupleGenerator = new TupleGenerator(RECORD_TYPE, META_TYPE, KEY_INDEXES, KEY_INDICATOR,
                         RECORD_GEN_FUNCTION, UNIQUE_RECORD_FIELDS, META_GEN_FUNCTION, UNIQUE_META_FIELDS);
@@ -195,7 +196,7 @@ public class CheckpointingTest {
                     tupleAppender.write(insertOp, true);
                 }
                 insertOp.close();
-                nc.getTransactionManager().completedTransaction(txnCtx, new DatasetId(-1), -1, true);
+                nc.getTransactionManager().completedTransaction(txnCtx, DatasetId.NULL, -1, true);
             } finally {
                 nc.deInit();
             }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/04075533/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/AbstractOperationCallback.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/AbstractOperationCallback.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/AbstractOperationCallback.java
index ba7a780..9844344 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/AbstractOperationCallback.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/AbstractOperationCallback.java
@@ -32,9 +32,9 @@ public abstract class AbstractOperationCallback {
     protected final ILockManager lockManager;
     protected final long[] longHashes;
 
-    public AbstractOperationCallback(int datasetId, int[] primaryKeyFields, ITransactionContext txnCtx,
+    public AbstractOperationCallback(DatasetId datasetId, int[] primaryKeyFields, ITransactionContext txnCtx,
             ILockManager lockManager) {
-        this.datasetId = new DatasetId(datasetId);
+        this.datasetId = datasetId;
         this.primaryKeyFields = primaryKeyFields;
         this.txnCtx = txnCtx;
         this.lockManager = lockManager;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/04075533/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/DatasetId.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/DatasetId.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/DatasetId.java
index 81ece13..da6988c 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/DatasetId.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/DatasetId.java
@@ -21,27 +21,22 @@ package org.apache.asterix.common.transactions;
 import java.io.Serializable;
 
 public class DatasetId implements Serializable {
-    /**
-     *
-     */
+
     private static final long serialVersionUID = 1L;
-    /**
-     * The number of bytes used to represent {@link DatasetId} value.
-     */
     public static final int BYTES = Integer.BYTES;
-
-    int id;
+    public static final DatasetId NULL = new ImmutableDatasetId(-1);
+    private int id;
 
     public DatasetId(int id) {
         this.id = id;
     }
 
-    public void setId(int id) {
-        this.id = id;
+    public final int getId() {
+        return id;
     }
 
-    public int getId() {
-        return id;
+    public void setId(int id) {
+        this.id = id;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/04075533/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java
index 29af931..6ee0980 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java
@@ -47,7 +47,6 @@ public interface ILogRecord {
     public static final int SEQ_NUM_LEN = Long.BYTES;
     public static final int TYPE_LEN = Byte.BYTES;
     public static final int UUID_LEN = Long.BYTES;
-    public static final int VBUCKET_ID_LEN = Short.BYTES;
 
     public static final int ALL_RECORD_HEADER_LEN = LOG_SOURCE_LEN + TYPE_LEN + JobId.BYTES;
     public static final int ENTITYCOMMIT_UPDATE_HEADER_LEN = RS_PARTITION_LEN + DatasetId.BYTES + PKHASH_LEN + PKSZ_LEN;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/04075533/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionManager.java
index ebad94d..0123814 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionManager.java
@@ -70,13 +70,13 @@ public interface ITransactionManager {
      *            the transaction context associated with the transaction
      * @param datasetId
      *            TODO
-     * @param PKHashVal
+     * @param pkHash
      *            TODO
      * @throws ACIDException
      * @see ITransactionContextimport org.apache.hyracks.api.job.JobId;
      * @see ACIDException
      */
-    public void commitTransaction(ITransactionContext txnContext, DatasetId datasetId, int PKHashVal)
+    public void commitTransaction(ITransactionContext txnContext, DatasetId datasetId, int pkHash)
             throws ACIDException;
 
     /**
@@ -86,13 +86,13 @@ public interface ITransactionManager {
      *            the transaction context associated with the transaction
      * @param datasetId
      *            TODO
-     * @param PKHashVal
+     * @param pkHash
      *            TODO
      * @throws ACIDException
      * @see ITransactionContext
      * @see ACIDException
      */
-    public void abortTransaction(ITransactionContext txnContext, DatasetId datasetId, int PKHashVal)
+    public void abortTransaction(ITransactionContext txnContext, DatasetId datasetId, int pkHash)
             throws ACIDException;
 
     /**
@@ -104,15 +104,15 @@ public interface ITransactionManager {
      *            the transaction context associated with the transaction
      * @param datasetId
      *            TODO
-     * @param PKHashVal
+     * @param pkHash
      *            TODO
      * @param success
      *            indicates the success or failure. The transaction is committed
      *            or aborted accordingly.
      * @throws ACIDException
      */
-    public void completedTransaction(ITransactionContext txnContext, DatasetId datasetId, int PKHashVal, boolean success)
-            throws ACIDException;
+    public void completedTransaction(ITransactionContext txnContext, DatasetId datasetId, int pkHash,
+            boolean success) throws ACIDException;
 
     /**
      * Returns the Transaction Provider for the transaction eco-system. A

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/04075533/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ImmutableDatasetId.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ImmutableDatasetId.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ImmutableDatasetId.java
new file mode 100644
index 0000000..c47ca10
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ImmutableDatasetId.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.transactions;
+
+public class ImmutableDatasetId extends DatasetId {
+    private static final long serialVersionUID = 1L;
+
+    public ImmutableDatasetId(int datasetId) {
+        super(datasetId);
+    }
+
+    @Override
+    public void setId(int datasetId) {
+        throw new UnsupportedOperationException();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/04075533/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java
index d691b18..e80cfa6 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java
@@ -133,7 +133,6 @@ public class LogRecord implements ILogRecord {
         buffer.putInt(jobId);
         switch (logType) {
             case LogType.ENTITY_COMMIT:
-            case LogType.UPSERT_ENTITY_COMMIT:
                 writeEntityInfo(buffer);
                 break;
             case LogType.UPDATE:
@@ -252,7 +251,7 @@ public class LogRecord implements ILogRecord {
         jobId = buffer.getInt();
         switch (logType) {
             case LogType.FLUSH:
-                if (buffer.remaining() < DatasetId.BYTES) {
+                if (buffer.remaining() < ILogRecord.DS_LEN) {
                     return RecordReadStatus.TRUNCATED;
                 }
                 datasetId = buffer.getInt();
@@ -268,7 +267,6 @@ public class LogRecord implements ILogRecord {
                 computeAndSetLogSize();
                 break;
             case LogType.ENTITY_COMMIT:
-            case LogType.UPSERT_ENTITY_COMMIT:
                 if (readEntityInfo(buffer)) {
                     computeAndSetLogSize();
                 } else {
@@ -308,6 +306,7 @@ public class LogRecord implements ILogRecord {
                         oldValue = readTuple(buffer, readOldValue, oldValueFieldCount, oldValueSize);
                     } else {
                         oldValueSize = 0;
+                        oldValue = null;
                     }
                 } else {
                     return RecordReadStatus.TRUNCATED;
@@ -428,7 +427,6 @@ public class LogRecord implements ILogRecord {
                 logSize = JOB_TERMINATE_LOG_SIZE;
                 break;
             case LogType.ENTITY_COMMIT:
-            case LogType.UPSERT_ENTITY_COMMIT:
                 logSize = ENTITY_COMMIT_LOG_BASE_SIZE + PKValueSize;
                 break;
             case LogType.FLUSH:
@@ -457,7 +455,7 @@ public class LogRecord implements ILogRecord {
         builder.append(" LogType : ").append(LogType.toString(logType));
         builder.append(" LogSize : ").append(logSize);
         builder.append(" JobId : ").append(jobId);
-        if (logType == LogType.ENTITY_COMMIT || logType == LogType.UPSERT_ENTITY_COMMIT || logType == LogType.UPDATE) {
+        if (logType == LogType.ENTITY_COMMIT || logType == LogType.UPDATE) {
             builder.append(" DatasetId : ").append(datasetId);
             builder.append(" ResourcePartition : ").append(resourcePartition);
             builder.append(" PKHashValue : ").append(PKHashValue);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/04075533/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogType.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogType.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogType.java
index 269e4b9..11c45ad 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogType.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogType.java
@@ -25,7 +25,6 @@ public class LogType {
     public static final byte ENTITY_COMMIT = 2;
     public static final byte ABORT = 3;
     public static final byte FLUSH = 4;
-    public static final byte UPSERT_ENTITY_COMMIT = 5;
     public static final byte WAIT = 6;
     public static final byte MARKER = 7;
 
@@ -34,7 +33,6 @@ public class LogType {
     private static final String STRING_ENTITY_COMMIT = "ENTITY_COMMIT";
     private static final String STRING_ABORT = "ABORT";
     private static final String STRING_FLUSH = "FLUSH";
-    private static final String STRING_UPSERT_ENTITY_COMMIT = "UPSERT_ENTITY_COMMIT";
     private static final String STRING_WAIT = "WAIT";
     private static final String STRING_MARKER = "MARKER";
     private static final String STRING_UNKNOWN_LOG_TYPE = "UNKNOWN_LOG_TYPE";
@@ -51,8 +49,6 @@ public class LogType {
                 return STRING_ABORT;
             case LogType.FLUSH:
                 return STRING_FLUSH;
-            case LogType.UPSERT_ENTITY_COMMIT:
-                return STRING_UPSERT_ENTITY_COMMIT;
             case LogType.WAIT:
                 return STRING_WAIT;
             case LogType.MARKER:

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/04075533/asterixdb/asterix-installer/pom.xml
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-installer/pom.xml b/asterixdb/asterix-installer/pom.xml
index 8b200bb..a6e7541 100644
--- a/asterixdb/asterix-installer/pom.xml
+++ b/asterixdb/asterix-installer/pom.xml
@@ -106,7 +106,6 @@
             <dependencySet>
               <includes>
                 <!-- NOTE! Any changes here must be mirrored in src/main/assembly/binary-assembly.xml -->
-                <include>org.apache.hadoop:hadoop-core</include>
                 <include>commons-cli:commons-cli</include>
                 <include>commons-logging:commons-logging</include>
               </includes>

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/04075533/asterixdb/asterix-installer/src/main/assembly/binary-assembly.xml
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-installer/src/main/assembly/binary-assembly.xml b/asterixdb/asterix-installer/src/main/assembly/binary-assembly.xml
index 88f2b70..8d3844a 100644
--- a/asterixdb/asterix-installer/src/main/assembly/binary-assembly.xml
+++ b/asterixdb/asterix-installer/src/main/assembly/binary-assembly.xml
@@ -104,7 +104,6 @@
     <dependencySet>
       <includes>
         <!-- NOTE! Any changes here must be mirrored in asterix-installer pom.xml for licensegen -->
-        <include>org.apache.hadoop:hadoop-core</include>
         <include>commons-cli:commons-cli</include>
         <include>commons-logging:commons-logging</include>
       </includes>

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/04075533/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
index 51790e6..7a04ce8 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
@@ -41,6 +41,7 @@ import org.apache.asterix.common.transactions.DatasetId;
 import org.apache.asterix.common.transactions.IRecoveryManager.ResourceType;
 import org.apache.asterix.common.transactions.ITransactionContext;
 import org.apache.asterix.common.transactions.ITransactionSubsystem;
+import org.apache.asterix.common.transactions.ImmutableDatasetId;
 import org.apache.asterix.common.transactions.JobId;
 import org.apache.asterix.external.indexing.ExternalFile;
 import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider;
@@ -94,6 +95,7 @@ import org.apache.asterix.om.types.AUnionType;
 import org.apache.asterix.om.types.AbstractComplexType;
 import org.apache.asterix.om.types.BuiltinType;
 import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.transaction.management.opcallbacks.AbstractIndexModificationOperationCallback.Operation;
 import org.apache.asterix.transaction.management.opcallbacks.SecondaryIndexModificationOperationCallback;
 import org.apache.asterix.transaction.management.service.transaction.DatasetIdFactory;
 import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
@@ -114,7 +116,6 @@ import org.apache.hyracks.storage.am.common.api.IndexException;
 import org.apache.hyracks.storage.am.common.api.TreeIndexException;
 import org.apache.hyracks.storage.am.common.exceptions.TreeIndexDuplicateKeyException;
 import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback;
-import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
 import org.apache.hyracks.storage.am.common.ophelpers.MultiComparator;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
@@ -123,8 +124,8 @@ import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex;
 public class MetadataNode implements IMetadataNode {
     private static final long serialVersionUID = 1L;
 
-    private static final DatasetId METADATA_DATASET_ID = new DatasetId(
-            MetadataPrimaryIndexes.PROPERTIES_METADATA.getDatasetId());
+    private static final DatasetId METADATA_DATASET_ID =
+            new ImmutableDatasetId(MetadataPrimaryIndexes.PROPERTIES_METADATA.getDatasetId());
 
     // shared between core and extension
     private IDatasetLifecycleManager datasetLifecycleManager;
@@ -167,7 +168,7 @@ public class MetadataNode implements IMetadataNode {
     @Override
     public void commitTransaction(JobId jobId) throws RemoteException, ACIDException {
         ITransactionContext txnCtx = transactionSubsystem.getTransactionManager().getTransactionContext(jobId, false);
-        transactionSubsystem.getTransactionManager().commitTransaction(txnCtx, new DatasetId(-1), -1);
+        transactionSubsystem.getTransactionManager().commitTransaction(txnCtx, DatasetId.NULL, -1);
     }
 
     @Override
@@ -175,7 +176,7 @@ public class MetadataNode implements IMetadataNode {
         try {
             ITransactionContext txnCtx = transactionSubsystem.getTransactionManager().getTransactionContext(jobId,
                     false);
-            transactionSubsystem.getTransactionManager().abortTransaction(txnCtx, new DatasetId(-1), -1);
+            transactionSubsystem.getTransactionManager().abortTransaction(txnCtx, DatasetId.NULL, -1);
         } catch (ACIDException e) {
             e.printStackTrace();
             throw e;
@@ -407,7 +408,7 @@ public class MetadataNode implements IMetadataNode {
 
     private void insertTupleIntoIndex(JobId jobId, IMetadataIndex metadataIndex, ITupleReference tuple)
             throws ACIDException, HyracksDataException, IndexException {
-        long resourceID = metadataIndex.getResourceID();
+        long resourceID = metadataIndex.getResourceId();
         String resourceName = metadataIndex.getFile().getRelativePath();
         ILSMIndex lsmIndex = (ILSMIndex) datasetLifecycleManager.get(resourceName);
         try {
@@ -415,7 +416,7 @@ public class MetadataNode implements IMetadataNode {
 
             // prepare a Callback for logging
             IModificationOperationCallback modCallback = createIndexModificationCallback(jobId, resourceID,
-                    metadataIndex, lsmIndex, IndexOperation.INSERT);
+                    metadataIndex, lsmIndex, Operation.INSERT);
 
             ILSMIndexAccessor indexAccessor = lsmIndex.createAccessor(modCallback, NoOpOperationCallback.INSTANCE);
 
@@ -435,16 +436,16 @@ public class MetadataNode implements IMetadataNode {
     }
 
     private IModificationOperationCallback createIndexModificationCallback(JobId jobId, long resourceId,
-            IMetadataIndex metadataIndex, ILSMIndex lsmIndex, IndexOperation indexOp) throws ACIDException {
+            IMetadataIndex metadataIndex, ILSMIndex lsmIndex, Operation indexOp) throws ACIDException {
         ITransactionContext txnCtx = transactionSubsystem.getTransactionManager().getTransactionContext(jobId, false);
 
         // Regardless of the index type (primary or secondary index), secondary index modification callback is given
         // This is still correct since metadata index operation doesn't require any lock from ConcurrentLockMgr and
         // The difference between primaryIndexModCallback and secondaryIndexModCallback is that primary index requires
         // locks and secondary index doesn't.
-        return new SecondaryIndexModificationOperationCallback(metadataIndex.getDatasetId().getId(),
+        return new SecondaryIndexModificationOperationCallback(metadataIndex.getDatasetId(),
                 metadataIndex.getPrimaryKeyIndexes(), txnCtx, transactionSubsystem.getLockManager(),
-                transactionSubsystem, resourceId, metadataStoragePartition, ResourceType.LSM_BTREE, indexOp, false);
+                transactionSubsystem, resourceId, metadataStoragePartition, ResourceType.LSM_BTREE, indexOp);
     }
 
     @Override
@@ -679,14 +680,14 @@ public class MetadataNode implements IMetadataNode {
 
     private void deleteTupleFromIndex(JobId jobId, IMetadataIndex metadataIndex, ITupleReference tuple)
             throws ACIDException, HyracksDataException, IndexException {
-        long resourceID = metadataIndex.getResourceID();
+        long resourceID = metadataIndex.getResourceId();
         String resourceName = metadataIndex.getFile().getRelativePath();
         ILSMIndex lsmIndex = (ILSMIndex) datasetLifecycleManager.get(resourceName);
         try {
             datasetLifecycleManager.open(resourceName);
             // prepare a Callback for logging
             IModificationOperationCallback modCallback = createIndexModificationCallback(jobId, resourceID,
-                    metadataIndex, lsmIndex, IndexOperation.DELETE);
+                    metadataIndex, lsmIndex, Operation.DELETE);
             ILSMIndexAccessor indexAccessor = lsmIndex.createAccessor(modCallback, NoOpOperationCallback.INSTANCE);
 
             ITransactionContext txnCtx = transactionSubsystem.getTransactionManager().getTransactionContext(jobId,

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/04075533/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataIndex.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataIndex.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataIndex.java
index 9eec64c..6c220af 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataIndex.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataIndex.java
@@ -75,9 +75,9 @@ public interface IMetadataIndex extends Serializable {
 
     public int getFileId();
 
-    public void setResourceID(long resourceID);
+    public void setResourceId(long resourceId);
 
-    public long getResourceID();
+    public long getResourceId();
 
     public DatasetId getDatasetId();
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/04075533/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
index 0410911..15a2783 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
@@ -364,7 +364,7 @@ public class MetadataBootstrap {
                     ioOpCallbackFactory.createIoOpCallback(), index.isPrimaryIndex(), null, null, null, null, true,
                     appContext.getStorageComponentProvider().getMetadataPageManagerFactory());
             lsmBtree.create();
-            resourceID = index.getResourceID();
+            resourceID = index.getResourceId();
             Resource localResourceMetadata = new LSMBTreeLocalResourceMetadata(typeTraits, comparatorFactories,
                     bloomFilterKeyFields, index.isPrimaryIndex(), index.getDatasetId().getId(),
                     metadataPartition.getPartitionId(), appContext.getMetadataMergePolicyFactory(),
@@ -385,7 +385,7 @@ public class MetadataBootstrap {
                         + " to intialize as a new instance. (WARNING: all data will be lost.)");
             }
             resourceID = resource.getId();
-            if (index.getResourceID() != resource.getId()) {
+            if (index.getResourceId() != resource.getId()) {
                 throw new HyracksDataException("Resource Id doesn't match expected metadata index resource id");
             }
             lsmBtree = (LSMBTree) dataLifecycleManager.get(file.getRelativePath());
@@ -402,7 +402,7 @@ public class MetadataBootstrap {
                 dataLifecycleManager.register(file.getRelativePath(), lsmBtree);
             }
         }
-        index.setResourceID(resourceID);
+        index.setResourceId(resourceID);
         index.setFile(file);
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/04075533/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataIndex.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataIndex.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataIndex.java
index 80a670e..ee5c9f3 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataIndex.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataIndex.java
@@ -26,6 +26,7 @@ import java.util.List;
 
 import org.apache.asterix.common.metadata.MetadataIndexImmutableProperties;
 import org.apache.asterix.common.transactions.DatasetId;
+import org.apache.asterix.common.transactions.ImmutableDatasetId;
 import org.apache.asterix.formats.nontagged.BinaryComparatorFactoryProvider;
 import org.apache.asterix.formats.nontagged.BinaryHashFunctionFactoryProvider;
 import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider;
@@ -147,7 +148,7 @@ public class MetadataIndex implements IMetadataIndex {
             }
         }
 
-        this.datasetId = new DatasetId(indexImmutableProperties.getDatasetId());
+        this.datasetId = new ImmutableDatasetId(indexImmutableProperties.getDatasetId());
         this.isPrimaryIndex = isPrimaryIndex;
 
         //PrimaryKeyFieldIndexes
@@ -260,12 +261,12 @@ public class MetadataIndex implements IMetadataIndex {
     }
 
     @Override
-    public void setResourceID(long resourceID) {
+    public void setResourceId(long resourceID) {
         this.resourceId = resourceID;
     }
 
     @Override
-    public long getResourceID() {
+    public long getResourceId() {
         return resourceId;
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/04075533/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
index 6901e1d..39ea54d 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -89,6 +89,7 @@ import org.apache.asterix.runtime.operators.LSMTreeUpsertOperatorDescriptor;
 import org.apache.asterix.runtime.utils.AppContextInfo;
 import org.apache.asterix.runtime.utils.ClusterStateManager;
 import org.apache.asterix.runtime.utils.RuntimeComponentsProvider;
+import org.apache.asterix.transaction.management.opcallbacks.AbstractIndexModificationOperationCallback.Operation;
 import org.apache.asterix.transaction.management.opcallbacks.LockThenSearchOperationCallbackFactory;
 import org.apache.asterix.transaction.management.opcallbacks.PrimaryIndexInstantSearchOperationCallbackFactory;
 import org.apache.asterix.transaction.management.opcallbacks.PrimaryIndexModificationOperationCallbackFactory;
@@ -1092,9 +1093,9 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String>
 
             IModificationOperationCallbackFactory modificationCallbackFactory = temp
                     ? new TempDatasetPrimaryIndexModificationOperationCallbackFactory(jobId, datasetId,
-                            primaryKeyFields, txnSubsystemProvider, IndexOperation.UPSERT, ResourceType.LSM_BTREE)
+                            primaryKeyFields, txnSubsystemProvider, Operation.UPSERT, ResourceType.LSM_BTREE)
                     : new UpsertOperationCallbackFactory(jobId, datasetId, primaryKeyFields, txnSubsystemProvider,
-                            IndexOperation.UPSERT, ResourceType.LSM_BTREE, dataset.hasMetaPart());
+                            Operation.UPSERT, ResourceType.LSM_BTREE);
 
             LockThenSearchOperationCallbackFactory searchCallbackFactory = new LockThenSearchOperationCallbackFactory(
                     jobId, datasetId, primaryKeyFields, txnSubsystemProvider, ResourceType.LSM_BTREE);
@@ -1288,11 +1289,10 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String>
             IModificationOperationCallbackFactory modificationCallbackFactory = temp
                     ? new TempDatasetPrimaryIndexModificationOperationCallbackFactory(
                             ((JobEventListenerFactory) spec.getJobletEventListenerFactory()).getJobId(), datasetId,
-                            primaryKeyFields, txnSubsystemProvider, indexOp, ResourceType.LSM_BTREE)
+                            primaryKeyFields, txnSubsystemProvider, Operation.get(indexOp), ResourceType.LSM_BTREE)
                     : new PrimaryIndexModificationOperationCallbackFactory(
                             ((JobEventListenerFactory) spec.getJobletEventListenerFactory()).getJobId(), datasetId,
-                            primaryKeyFields, txnSubsystemProvider, indexOp, ResourceType.LSM_BTREE,
-                            dataset.hasMetaPart());
+                            primaryKeyFields, txnSubsystemProvider, Operation.get(indexOp), ResourceType.LSM_BTREE);
 
             Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
                     DatasetUtil.getMergePolicyFactory(dataset, mdTxnCtx);
@@ -1486,10 +1486,11 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String>
             int datasetId = dataset.getDatasetId();
             IModificationOperationCallbackFactory modificationCallbackFactory = temp
                     ? new TempDatasetSecondaryIndexModificationOperationCallbackFactory(jobId, datasetId,
-                            modificationCallbackPrimaryKeyFields, txnSubsystemProvider, indexOp, ResourceType.LSM_BTREE)
+                            modificationCallbackPrimaryKeyFields, txnSubsystemProvider, Operation.get(indexOp),
+                            ResourceType.LSM_BTREE)
                     : new SecondaryIndexModificationOperationCallbackFactory(jobId, datasetId,
-                            modificationCallbackPrimaryKeyFields, txnSubsystemProvider, indexOp, ResourceType.LSM_BTREE,
-                            dataset.hasMetaPart());
+                            modificationCallbackPrimaryKeyFields, txnSubsystemProvider, Operation.get(indexOp),
+                            ResourceType.LSM_BTREE);
 
             Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
                     DatasetUtil.getMergePolicyFactory(dataset, mdTxnCtx);
@@ -1643,10 +1644,11 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String>
             int datasetId = dataset.getDatasetId();
             IModificationOperationCallbackFactory modificationCallbackFactory = temp
                     ? new TempDatasetSecondaryIndexModificationOperationCallbackFactory(jobId, datasetId,
-                            modificationCallbackPrimaryKeyFields, txnSubsystemProvider, indexOp, ResourceType.LSM_RTREE)
+                            modificationCallbackPrimaryKeyFields, txnSubsystemProvider, Operation.get(indexOp),
+                            ResourceType.LSM_RTREE)
                     : new SecondaryIndexModificationOperationCallbackFactory(jobId, datasetId,
-                            modificationCallbackPrimaryKeyFields, txnSubsystemProvider, indexOp, ResourceType.LSM_RTREE,
-                            dataset.hasMetaPart());
+                            modificationCallbackPrimaryKeyFields, txnSubsystemProvider, Operation.get(indexOp),
+                            ResourceType.LSM_RTREE);
 
             Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
                     DatasetUtil.getMergePolicyFactory(dataset, mdTxnCtx);
@@ -1852,11 +1854,11 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String>
             int datasetId = dataset.getDatasetId();
             IModificationOperationCallbackFactory modificationCallbackFactory = temp
                     ? new TempDatasetSecondaryIndexModificationOperationCallbackFactory(jobId, datasetId,
-                            modificationCallbackPrimaryKeyFields, txnSubsystemProvider, indexOp,
+                            modificationCallbackPrimaryKeyFields, txnSubsystemProvider, Operation.get(indexOp),
                             ResourceType.LSM_INVERTED_INDEX)
                     : new SecondaryIndexModificationOperationCallbackFactory(jobId, datasetId,
-                            modificationCallbackPrimaryKeyFields, txnSubsystemProvider, indexOp,
-                            ResourceType.LSM_INVERTED_INDEX, dataset.hasMetaPart());
+                            modificationCallbackPrimaryKeyFields, txnSubsystemProvider, Operation.get(indexOp),
+                            ResourceType.LSM_INVERTED_INDEX);
             Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
                     DatasetUtil.getMergePolicyFactory(dataset, mdTxnCtx);
             IIndexDataflowHelperFactory indexDataFlowFactory = dataset.getIndexDataflowHelperFactory(this,

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/04075533/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
index 34faf63..ad75b6b 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
@@ -60,6 +60,7 @@ import org.apache.asterix.metadata.utils.MetadataUtil;
 import org.apache.asterix.metadata.utils.RTreeDataflowHelperFactoryProvider;
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.asterix.om.utils.RecordUtil;
+import org.apache.asterix.transaction.management.opcallbacks.AbstractIndexModificationOperationCallback.Operation;
 import org.apache.asterix.transaction.management.opcallbacks.LockThenSearchOperationCallbackFactory;
 import org.apache.asterix.transaction.management.opcallbacks.PrimaryIndexInstantSearchOperationCallbackFactory;
 import org.apache.asterix.transaction.management.opcallbacks.PrimaryIndexModificationOperationCallbackFactory;
@@ -517,23 +518,26 @@ public class Dataset implements IMetadataEntity<Dataset>, IDataset {
             IStorageComponentProvider componentProvider, Index index, JobId jobId, IndexOperation op,
             int[] primaryKeyFields) throws AlgebricksException {
         if (getDatasetDetails().isTemp()) {
-            return new TempDatasetSecondaryIndexModificationOperationCallbackFactory(jobId, getDatasetId(),
-                    primaryKeyFields, componentProvider.getTransactionSubsystemProvider(), op, index.resourceType());
+            return op == IndexOperation.DELETE || op == IndexOperation.INSERT || op == IndexOperation.UPSERT
+                    ? new TempDatasetSecondaryIndexModificationOperationCallbackFactory(jobId, getDatasetId(),
+                            primaryKeyFields, componentProvider.getTransactionSubsystemProvider(), Operation.get(op),
+                            index.resourceType())
+                    : NoOpOperationCallbackFactory.INSTANCE;
         } else if (index.isPrimaryIndex()) {
             return op == IndexOperation.UPSERT
                     ? new UpsertOperationCallbackFactory(jobId, getDatasetId(), primaryKeyFields,
-                            componentProvider.getTransactionSubsystemProvider(), op, index.resourceType(),
-                            hasMetaPart())
+                            componentProvider.getTransactionSubsystemProvider(), Operation.get(op),
+                            index.resourceType())
                     : op == IndexOperation.DELETE || op == IndexOperation.INSERT
                             ? new PrimaryIndexModificationOperationCallbackFactory(jobId, getDatasetId(),
-                                    primaryKeyFields, componentProvider.getTransactionSubsystemProvider(), op,
-                                    index.resourceType(), hasMetaPart())
+                                    primaryKeyFields, componentProvider.getTransactionSubsystemProvider(),
+                                    Operation.get(op), index.resourceType())
                             : NoOpOperationCallbackFactory.INSTANCE;
         } else {
             return op == IndexOperation.DELETE || op == IndexOperation.INSERT || op == IndexOperation.UPSERT
                     ? new SecondaryIndexModificationOperationCallbackFactory(jobId, getDatasetId(), primaryKeyFields,
-                            componentProvider.getTransactionSubsystemProvider(), op, index.resourceType(),
-                            hasMetaPart())
+                            componentProvider.getTransactionSubsystemProvider(), Operation.get(op),
+                            index.resourceType())
                     : NoOpOperationCallbackFactory.INSTANCE;
         }
     }
@@ -574,10 +578,10 @@ public class Dataset implements IMetadataEntity<Dataset>, IDataset {
     }
 
     public IPushRuntimeFactory getCommitRuntimeFactory(JobId jobId, int[] primaryKeyFields,
-            MetadataProvider metadataProvider, int upsertVarIdx, int[] datasetPartitions, boolean isSink) {
+            MetadataProvider metadataProvider, int[] datasetPartitions, boolean isSink) {
         return new CommitRuntimeFactory(jobId, datasetId, primaryKeyFields,
-                metadataProvider.isTemporaryDatasetWriteJob(), metadataProvider.isWriteTransaction(), upsertVarIdx,
-                datasetPartitions, isSink);
+                metadataProvider.isTemporaryDatasetWriteJob(), metadataProvider.isWriteTransaction(), datasetPartitions,
+                isSink);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/04075533/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
index f02496a..dd40b04 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
@@ -462,7 +462,6 @@ public class ReplicationChannel extends Thread implements IReplicationChannel {
                 switch (remoteLog.getLogType()) {
                     case LogType.UPDATE:
                     case LogType.ENTITY_COMMIT:
-                    case LogType.UPSERT_ENTITY_COMMIT:
                         //if the log partition belongs to a partitions hosted on this node, replicate it
                         if (nodeHostedPartitions.contains(remoteLog.getResourcePartition())) {
                             logManager.log(remoteLog);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/04075533/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/JobEventListenerFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/JobEventListenerFactory.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/JobEventListenerFactory.java
index 7ec5691..b182add 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/JobEventListenerFactory.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/JobEventListenerFactory.java
@@ -55,7 +55,7 @@ public class JobEventListenerFactory implements IJobletEventListenerFactory {
                             .getApplicationContext()).getTransactionSubsystem().getTransactionManager();
                     ITransactionContext txnContext = txnManager.getTransactionContext(jobId, false);
                     txnContext.setWriteTxn(transactionalWrite);
-                    txnManager.completedTransaction(txnContext, new DatasetId(-1), -1,
+                    txnManager.completedTransaction(txnContext, DatasetId.NULL, -1,
                             !(jobStatus == JobStatus.FAILURE));
                 } catch (ACIDException e) {
                     throw new Error(e);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/04075533/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/MultiTransactionJobletEventListenerFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/MultiTransactionJobletEventListenerFactory.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/MultiTransactionJobletEventListenerFactory.java
index ac1895b..c194d64 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/MultiTransactionJobletEventListenerFactory.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/MultiTransactionJobletEventListenerFactory.java
@@ -59,7 +59,7 @@ public class MultiTransactionJobletEventListenerFactory implements IJobletEventL
                     for (JobId jobId : jobIds) {
                         ITransactionContext txnContext = txnManager.getTransactionContext(jobId, false);
                         txnContext.setWriteTxn(transactionalWrite);
-                        txnManager.completedTransaction(txnContext, new DatasetId(-1), -1,
+                        txnManager.completedTransaction(txnContext, DatasetId.NULL, -1,
                                 !(jobStatus == JobStatus.FAILURE));
                     }
                 } catch (ACIDException e) {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/04075533/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
index 704ee52..1f18c97 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
@@ -31,6 +31,8 @@ import org.apache.asterix.common.transactions.PrimaryIndexLogMarkerCallback;
 import org.apache.asterix.om.pointables.nonvisitor.ARecordPointable;
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.asterix.om.types.ATypeTag;
+import org.apache.asterix.transaction.management.opcallbacks.AbstractIndexModificationOperationCallback;
+import org.apache.asterix.transaction.management.opcallbacks.AbstractIndexModificationOperationCallback.Operation;
 import org.apache.asterix.transaction.management.opcallbacks.LockThenSearchOperationCallback;
 import org.apache.hyracks.api.comm.VSizeFrame;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
@@ -49,7 +51,6 @@ import org.apache.hyracks.dataflow.common.utils.TaskUtil;
 import org.apache.hyracks.storage.am.btree.impls.RangePredicate;
 import org.apache.hyracks.storage.am.btree.util.BTreeUtils;
 import org.apache.hyracks.storage.am.common.api.IIndexCursor;
-import org.apache.hyracks.storage.am.common.api.IModificationOperationCallback.Operation;
 import org.apache.hyracks.storage.am.common.api.ITreeIndex;
 import org.apache.hyracks.storage.am.common.api.IndexException;
 import org.apache.hyracks.storage.am.common.dataflow.IIndexOperatorDescriptor;
@@ -88,6 +89,7 @@ public class LSMPrimaryUpsertOperatorNodePushable extends LSMIndexInsertUpdateDe
     private LockThenSearchOperationCallback searchCallback;
     private IFrameOperationCallback frameOpCallback;
     private final IFrameOperationCallbackFactory frameOpCallbackFactory;
+    private AbstractIndexModificationOperationCallback abstractModCallback;
 
     public LSMPrimaryUpsertOperatorNodePushable(IIndexOperatorDescriptor opDesc, IHyracksTaskContext ctx, int partition,
             int[] fieldPermutation, IRecordDescriptorProvider recordDescProvider, int numOfPrimaryKeys,
@@ -149,9 +151,11 @@ public class LSMPrimaryUpsertOperatorNodePushable extends LSMIndexInsertUpdateDe
             appender = new FrameTupleAppender(new VSizeFrame(ctx), true);
             modCallback = opDesc.getModificationOpCallbackFactory()
                     .createModificationOperationCallback(indexHelper.getResource(), ctx, this);
+            abstractModCallback = (AbstractIndexModificationOperationCallback) modCallback;
             searchCallback = (LockThenSearchOperationCallback) opDesc.getSearchOpCallbackFactory()
                     .createSearchOperationCallback(indexHelper.getResource().getId(), ctx, this);
-            indexAccessor = index.createAccessor(modCallback, searchCallback);
+            indexAccessor = index.createAccessor(abstractModCallback, searchCallback);
+
             cursor = indexAccessor.createSearchCursor(false);
             frameTuple = new FrameTupleReference();
             IAppRuntimeContext appCtx =
@@ -172,12 +176,12 @@ public class LSMPrimaryUpsertOperatorNodePushable extends LSMIndexInsertUpdateDe
     }
 
     private void writeOutput(int tupleIndex, boolean recordWasInserted, boolean recordWasDeleted) throws IOException {
-        frameTuple.reset(accessor, tupleIndex);
-        for (int i = 0; i < frameTuple.getFieldCount(); i++) {
-            dos.write(frameTuple.getFieldData(i), frameTuple.getFieldStart(i), frameTuple.getFieldLength(i));
-            tb.addFieldEndOffset();
-        }
         if (recordWasInserted || recordWasDeleted) {
+            frameTuple.reset(accessor, tupleIndex);
+            for (int i = 0; i < frameTuple.getFieldCount(); i++) {
+                dos.write(frameTuple.getFieldData(i), frameTuple.getFieldStart(i), frameTuple.getFieldLength(i));
+                tb.addFieldEndOffset();
+            }
             FrameUtils.appendToWriter(writer, appender, tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize());
         } else {
             try {
@@ -233,12 +237,15 @@ public class LSMPrimaryUpsertOperatorNodePushable extends LSMIndexInsertUpdateDe
                                 prevTuple.getFieldLength(filterFieldIndex));
                         tb.addFieldEndOffset();
                     }
-                    modCallback.setOp(Operation.DELETE);
-                    if (firstModification) {
-                        lsmAccessor.delete(prevTuple);
-                        firstModification = false;
-                    } else {
-                        lsmAccessor.forceDelete(prevTuple);
+                    if (isNull(tuple, numOfPrimaryKeys)) {
+                        // Only delete if it is a delete and not upsert
+                        abstractModCallback.setOp(Operation.DELETE);
+                        if (firstModification) {
+                            lsmAccessor.delete(prevTuple);
+                            firstModification = false;
+                        } else {
+                            lsmAccessor.forceDelete(prevTuple);
+                        }
                     }
                 } else {
                     prevTuple = null;
@@ -253,12 +260,12 @@ public class LSMPrimaryUpsertOperatorNodePushable extends LSMIndexInsertUpdateDe
                     cursor.reset();
                 }
                 if (!isNull(tuple, numOfPrimaryKeys)) {
-                    modCallback.setOp(Operation.INSERT);
+                    abstractModCallback.setOp(Operation.UPSERT);
                     if (firstModification) {
-                        lsmAccessor.insert(tuple);
+                        lsmAccessor.upsert(tuple);
                         firstModification = false;
                     } else {
-                        lsmAccessor.forceInsert(tuple);
+                        lsmAccessor.forceUpsert(tuple);
                     }
                     recordWasInserted = true;
                 }