You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by am...@apache.org on 2016/02/01 09:31:21 UTC

[02/13] incubator-asterixdb git commit: Add Support for Upsert Operation

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/5dc958ce/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/temporal/ParseDateDescriptor.java
----------------------------------------------------------------------
diff --git a/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/temporal/ParseDateDescriptor.java b/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/temporal/ParseDateDescriptor.java
index b1fbf7c..d4c93ff 100644
--- a/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/temporal/ParseDateDescriptor.java
+++ b/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/temporal/ParseDateDescriptor.java
@@ -51,14 +51,9 @@ import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
  * Multiple format strings can be used by separating them using <b>|(bar)</b>, and the parsing will be successful only when the format string has the <b>exact</b> match with the given data string. This means that a time string like <it>08:23:12 AM</it> will not be valid for the format string <it>h:m:s</it> as there is no AM/PM format character in the format string.
  */
 public class ParseDateDescriptor extends AbstractScalarFunctionDynamicDescriptor {
-
     private static final long serialVersionUID = 1L;
     public final static FunctionIdentifier FID = AsterixBuiltinFunctions.PARSE_DATE;
-
-    private final static byte SER_STRING_TYPE_TAG = ATypeTag.STRING.serialize();
-    private final static byte SER_NULL_TYPE_TAG = ATypeTag.NULL.serialize();
     private final static DateTimeFormatUtils DT_UTILS = DateTimeFormatUtils.getInstance();
-
     public final static IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
 
         @Override
@@ -101,20 +96,21 @@ public class ParseDateDescriptor extends AbstractScalarFunctionDynamicDescriptor
                         eval1.evaluate(tuple);
 
                         try {
-                            if (argOut0.getByteArray()[0] == SER_NULL_TYPE_TAG
-                                    || argOut1.getByteArray()[0] == SER_NULL_TYPE_TAG) {
+                            if (argOut0.getByteArray()[0] == ATypeTag.SERIALIZED_NULL_TYPE_TAG
+                                    || argOut1.getByteArray()[0] == ATypeTag.SERIALIZED_NULL_TYPE_TAG) {
                                 nullSerde.serialize(ANull.NULL, out);
                                 return;
                             }
 
-                            if (argOut0.getByteArray()[0] != SER_STRING_TYPE_TAG
-                                    || argOut1.getByteArray()[0] != SER_STRING_TYPE_TAG) {
-                                throw new AlgebricksException(getIdentifier().getName()
-                                        + ": expects two strings but got  ("
-                                        + EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(argOut0.getByteArray()[0])
-                                        + ", "
-                                        + EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(argOut1.getByteArray()[0])
-                                        + ")");
+                            if (argOut0.getByteArray()[0] != ATypeTag.SERIALIZED_STRING_TYPE_TAG
+                                    || argOut1.getByteArray()[0] != ATypeTag.SERIALIZED_STRING_TYPE_TAG) {
+                                throw new AlgebricksException(
+                                        getIdentifier().getName() + ": expects two strings but got  ("
+                                                + EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(
+                                                        argOut0.getByteArray()[0])
+                                                + ", " + EnumDeserializer.ATYPETAGDESERIALIZER
+                                                        .deserialize(argOut1.getByteArray()[0])
+                                                + ")");
                             }
                             utf8Ptr.set(argOut0.getByteArray(), 1, argOut0.getLength() - 1);
                             int start0 = utf8Ptr.getCharStartOffset();

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/5dc958ce/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/temporal/ParseDateTimeDescriptor.java
----------------------------------------------------------------------
diff --git a/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/temporal/ParseDateTimeDescriptor.java b/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/temporal/ParseDateTimeDescriptor.java
index 2a7be96..bdd0783 100644
--- a/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/temporal/ParseDateTimeDescriptor.java
+++ b/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/temporal/ParseDateTimeDescriptor.java
@@ -46,14 +46,9 @@ import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
 import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
 
 public class ParseDateTimeDescriptor extends AbstractScalarFunctionDynamicDescriptor {
-
     private static final long serialVersionUID = 1L;
     public final static FunctionIdentifier FID = AsterixBuiltinFunctions.PARSE_DATETIME;
-
-    private final static byte SER_STRING_TYPE_TAG = ATypeTag.STRING.serialize();
-    private final static byte SER_NULL_TYPE_TAG = ATypeTag.NULL.serialize();
     private final static DateTimeFormatUtils DT_UTILS = DateTimeFormatUtils.getInstance();
-
     public final static IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
 
         @Override
@@ -96,26 +91,27 @@ public class ParseDateTimeDescriptor extends AbstractScalarFunctionDynamicDescri
                         eval1.evaluate(tuple);
 
                         try {
-                            if (argOut0.getByteArray()[0] == SER_NULL_TYPE_TAG
-                                    || argOut1.getByteArray()[0] == SER_NULL_TYPE_TAG) {
+                            if (argOut0.getByteArray()[0] == ATypeTag.SERIALIZED_NULL_TYPE_TAG
+                                    || argOut1.getByteArray()[0] == ATypeTag.SERIALIZED_NULL_TYPE_TAG) {
                                 nullSerde.serialize(ANull.NULL, out);
                                 return;
                             }
 
-                            if (argOut0.getByteArray()[0] != SER_STRING_TYPE_TAG
-                                    || argOut1.getByteArray()[0] != SER_STRING_TYPE_TAG) {
-                                throw new AlgebricksException(getIdentifier().getName()
-                                        + ": expects two strings but got  ("
-                                        + EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(argOut0.getByteArray()[0])
-                                        + ", "
-                                        + EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(argOut1.getByteArray()[0])
-                                        + ")");
+                            if (argOut0.getByteArray()[0] != ATypeTag.SERIALIZED_STRING_TYPE_TAG
+                                    || argOut1.getByteArray()[0] != ATypeTag.SERIALIZED_STRING_TYPE_TAG) {
+                                throw new AlgebricksException(
+                                        getIdentifier().getName() + ": expects two strings but got  ("
+                                                + EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(
+                                                        argOut0.getByteArray()[0])
+                                                + ", " + EnumDeserializer.ATYPETAGDESERIALIZER
+                                                        .deserialize(argOut1.getByteArray()[0])
+                                                + ")");
                             }
-                            utf8Ptr.set(argOut0.getByteArray(), 1, argOut0.getLength()-1);
+                            utf8Ptr.set(argOut0.getByteArray(), 1, argOut0.getLength() - 1);
                             int start0 = utf8Ptr.getCharStartOffset();
                             int length0 = utf8Ptr.getUTF8Length();
 
-                            utf8Ptr.set(argOut1.getByteArray(), 1, argOut1.getLength()-1);
+                            utf8Ptr.set(argOut1.getByteArray(), 1, argOut1.getLength() - 1);
                             int start1 = utf8Ptr.getCharStartOffset();
                             int length1 = utf8Ptr.getUTF8Length();
                             long chronon = 0;

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/5dc958ce/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/temporal/ParseTimeDescriptor.java
----------------------------------------------------------------------
diff --git a/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/temporal/ParseTimeDescriptor.java b/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/temporal/ParseTimeDescriptor.java
index 007ddca..db6e8b3 100644
--- a/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/temporal/ParseTimeDescriptor.java
+++ b/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/temporal/ParseTimeDescriptor.java
@@ -46,12 +46,8 @@ import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
 import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
 
 public class ParseTimeDescriptor extends AbstractScalarFunctionDynamicDescriptor {
-
     private static final long serialVersionUID = 1L;
     public final static FunctionIdentifier FID = AsterixBuiltinFunctions.PARSE_TIME;
-
-    private final static byte SER_STRING_TYPE_TAG = ATypeTag.STRING.serialize();
-    private final static byte SER_NULL_TYPE_TAG = ATypeTag.NULL.serialize();
     private final static DateTimeFormatUtils DT_UTILS = DateTimeFormatUtils.getInstance();
 
     public final static IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
@@ -96,26 +92,27 @@ public class ParseTimeDescriptor extends AbstractScalarFunctionDynamicDescriptor
                         eval1.evaluate(tuple);
 
                         try {
-                            if (argOut0.getByteArray()[0] == SER_NULL_TYPE_TAG
-                                    || argOut1.getByteArray()[0] == SER_NULL_TYPE_TAG) {
+                            if (argOut0.getByteArray()[0] == ATypeTag.SERIALIZED_NULL_TYPE_TAG
+                                    || argOut1.getByteArray()[0] == ATypeTag.SERIALIZED_NULL_TYPE_TAG) {
                                 nullSerde.serialize(ANull.NULL, out);
                                 return;
                             }
 
-                            if (argOut0.getByteArray()[0] != SER_STRING_TYPE_TAG
-                                    || argOut1.getByteArray()[0] != SER_STRING_TYPE_TAG) {
-                                throw new AlgebricksException(getIdentifier().getName()
-                                        + ": expects two strings but got  ("
-                                        + EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(argOut0.getByteArray()[0])
-                                        + ", "
-                                        + EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(argOut1.getByteArray()[0])
-                                        + ")");
+                            if (argOut0.getByteArray()[0] != ATypeTag.SERIALIZED_STRING_TYPE_TAG
+                                    || argOut1.getByteArray()[0] != ATypeTag.SERIALIZED_STRING_TYPE_TAG) {
+                                throw new AlgebricksException(
+                                        getIdentifier().getName() + ": expects two strings but got  ("
+                                                + EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(
+                                                        argOut0.getByteArray()[0])
+                                                + ", " + EnumDeserializer.ATYPETAGDESERIALIZER
+                                                        .deserialize(argOut1.getByteArray()[0])
+                                                + ")");
                             }
-                            utf8Ptr.set(argOut0.getByteArray(), 1, argOut0.getLength()-1);
+                            utf8Ptr.set(argOut0.getByteArray(), 1, argOut0.getLength() - 1);
                             int start0 = utf8Ptr.getCharStartOffset();
                             int length0 = utf8Ptr.getUTF8Length();
 
-                            utf8Ptr.set(argOut1.getByteArray(), 1, argOut1.getLength() -1);
+                            utf8Ptr.set(argOut1.getByteArray(), 1, argOut1.getLength() - 1);
                             int start1 = utf8Ptr.getCharStartOffset();
                             int length1 = utf8Ptr.getUTF8Length();
                             long chronon = 0;

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/5dc958ce/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/temporal/PrintDateDescriptor.java
----------------------------------------------------------------------
diff --git a/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/temporal/PrintDateDescriptor.java b/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/temporal/PrintDateDescriptor.java
index d731fff..d15534d 100644
--- a/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/temporal/PrintDateDescriptor.java
+++ b/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/temporal/PrintDateDescriptor.java
@@ -46,13 +46,8 @@ import org.apache.hyracks.util.string.UTF8StringUtil;
 import org.apache.hyracks.util.string.UTF8StringWriter;
 
 public class PrintDateDescriptor extends AbstractScalarFunctionDynamicDescriptor {
-
     private static final long serialVersionUID = 1L;
     public final static FunctionIdentifier FID = AsterixBuiltinFunctions.PRINT_DATE;
-
-    private final static byte SER_DATE_TYPE_TAG = ATypeTag.DATE.serialize();
-    private final static byte SER_STRING_TYPE_TAG = ATypeTag.STRING.serialize();
-    private final static byte SER_NULL_TYPE_TAG = ATypeTag.NULL.serialize();
     private final static DateTimeFormatUtils DT_UTILS = DateTimeFormatUtils.getInstance();
 
     public final static IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
@@ -94,20 +89,21 @@ public class PrintDateDescriptor extends AbstractScalarFunctionDynamicDescriptor
                         eval1.evaluate(tuple);
 
                         try {
-                            if (argOut0.getByteArray()[0] == SER_NULL_TYPE_TAG
-                                    || argOut1.getByteArray()[0] == SER_NULL_TYPE_TAG) {
+                            if (argOut0.getByteArray()[0] == ATypeTag.SERIALIZED_NULL_TYPE_TAG
+                                    || argOut1.getByteArray()[0] == ATypeTag.SERIALIZED_NULL_TYPE_TAG) {
                                 nullSerde.serialize(ANull.NULL, out);
                                 return;
                             }
 
-                            if (argOut0.getByteArray()[0] != SER_DATE_TYPE_TAG
-                                    || argOut1.getByteArray()[0] != SER_STRING_TYPE_TAG) {
-                                throw new AlgebricksException(getIdentifier().getName()
-                                        + ": expects (DATE, STRING) but got  ("
-                                        + EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(argOut0.getByteArray()[0])
-                                        + ", "
-                                        + EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(argOut1.getByteArray()[0])
-                                        + ")");
+                            if (argOut0.getByteArray()[0] != ATypeTag.SERIALIZED_DATE_TYPE_TAG
+                                    || argOut1.getByteArray()[0] != ATypeTag.SERIALIZED_STRING_TYPE_TAG) {
+                                throw new AlgebricksException(
+                                        getIdentifier().getName() + ": expects (DATE, STRING) but got  ("
+                                                + EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(
+                                                        argOut0.getByteArray()[0])
+                                                + ", " + EnumDeserializer.ATYPETAGDESERIALIZER
+                                                        .deserialize(argOut1.getByteArray()[0])
+                                                + ")");
                             }
 
                             long chronon = ADateSerializerDeserializer.getChronon(argOut0.getByteArray(), 1)
@@ -118,7 +114,7 @@ public class PrintDateDescriptor extends AbstractScalarFunctionDynamicDescriptor
                             DT_UTILS.printDateTime(chronon, 0, argOut1.getByteArray(), 1 + offset, formatLength, sbder,
                                     DateTimeParseMode.DATE_ONLY);
 
-                            out.writeByte(ATypeTag.STRING.serialize());
+                            out.writeByte(ATypeTag.SERIALIZED_STRING_TYPE_TAG);
                             utf8Writer.writeUTF8(sbder.toString(), out);
 
                         } catch (IOException ex) {

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/5dc958ce/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/temporal/PrintDateTimeDescriptor.java
----------------------------------------------------------------------
diff --git a/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/temporal/PrintDateTimeDescriptor.java b/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/temporal/PrintDateTimeDescriptor.java
index a31e1e5..ba35a42 100644
--- a/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/temporal/PrintDateTimeDescriptor.java
+++ b/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/temporal/PrintDateTimeDescriptor.java
@@ -45,13 +45,8 @@ import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
 import org.apache.hyracks.util.string.UTF8StringWriter;
 
 public class PrintDateTimeDescriptor extends AbstractScalarFunctionDynamicDescriptor {
-
     private static final long serialVersionUID = 1L;
     public final static FunctionIdentifier FID = AsterixBuiltinFunctions.PRINT_DATETIME;
-
-    private final static byte SER_DATETIME_TYPE_TAG = ATypeTag.DATETIME.serialize();
-    private final static byte SER_STRING_TYPE_TAG = ATypeTag.STRING.serialize();
-    private final static byte SER_NULL_TYPE_TAG = ATypeTag.NULL.serialize();
     private final static DateTimeFormatUtils DT_UTILS = DateTimeFormatUtils.getInstance();
 
     public final static IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
@@ -94,20 +89,21 @@ public class PrintDateTimeDescriptor extends AbstractScalarFunctionDynamicDescri
                         eval1.evaluate(tuple);
 
                         try {
-                            if (argOut0.getByteArray()[0] == SER_NULL_TYPE_TAG
-                                    || argOut1.getByteArray()[0] == SER_NULL_TYPE_TAG) {
+                            if (argOut0.getByteArray()[0] == ATypeTag.SERIALIZED_NULL_TYPE_TAG
+                                    || argOut1.getByteArray()[0] == ATypeTag.SERIALIZED_NULL_TYPE_TAG) {
                                 nullSerde.serialize(ANull.NULL, out);
                                 return;
                             }
 
-                            if (argOut0.getByteArray()[0] != SER_DATETIME_TYPE_TAG
-                                    || argOut1.getByteArray()[0] != SER_STRING_TYPE_TAG) {
-                                throw new AlgebricksException(getIdentifier().getName()
-                                        + ": expects (DATETIME, STRING) but got  ("
-                                        + EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(argOut0.getByteArray()[0])
-                                        + ", "
-                                        + EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(argOut1.getByteArray()[0])
-                                        + ")");
+                            if (argOut0.getByteArray()[0] != ATypeTag.SERIALIZED_DATETIME_TYPE_TAG
+                                    || argOut1.getByteArray()[0] != ATypeTag.SERIALIZED_STRING_TYPE_TAG) {
+                                throw new AlgebricksException(
+                                        getIdentifier().getName() + ": expects (DATETIME, STRING) but got  ("
+                                                + EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(
+                                                        argOut0.getByteArray()[0])
+                                                + ", " + EnumDeserializer.ATYPETAGDESERIALIZER
+                                                        .deserialize(argOut1.getByteArray()[0])
+                                                + ")");
                             }
                             long chronon = ADateTimeSerializerDeserializer.getChronon(argOut0.getByteArray(), 1);
                             utf8Ptr.set(argOut1.getByteArray(), 1, argOut1.getLength() - 1);
@@ -116,7 +112,7 @@ public class PrintDateTimeDescriptor extends AbstractScalarFunctionDynamicDescri
                             DT_UTILS.printDateTime(chronon, 0, utf8Ptr.getByteArray(), utf8Ptr.getCharStartOffset(),
                                     formatLength, sbder, DateTimeParseMode.DATETIME);
 
-                            out.writeByte(ATypeTag.STRING.serialize());
+                            out.writeByte(ATypeTag.SERIALIZED_STRING_TYPE_TAG);
                             utf8Writer.writeUTF8(sbder.toString(), out);
 
                         } catch (IOException ex) {

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/5dc958ce/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/temporal/PrintTimeDescriptor.java
----------------------------------------------------------------------
diff --git a/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/temporal/PrintTimeDescriptor.java b/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/temporal/PrintTimeDescriptor.java
index 961c44d..f152401 100644
--- a/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/temporal/PrintTimeDescriptor.java
+++ b/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/temporal/PrintTimeDescriptor.java
@@ -45,13 +45,8 @@ import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
 import org.apache.hyracks.util.string.UTF8StringWriter;
 
 public class PrintTimeDescriptor extends AbstractScalarFunctionDynamicDescriptor {
-
     private static final long serialVersionUID = 1L;
     public final static FunctionIdentifier FID = AsterixBuiltinFunctions.PRINT_TIME;
-
-    private final static byte SER_TIME_TYPE_TAG = ATypeTag.TIME.serialize();
-    private final static byte SER_STRING_TYPE_TAG = ATypeTag.STRING.serialize();
-    private final static byte SER_NULL_TYPE_TAG = ATypeTag.NULL.serialize();
     private final static DateTimeFormatUtils DT_UTILS = DateTimeFormatUtils.getInstance();
 
     public final static IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
@@ -94,20 +89,21 @@ public class PrintTimeDescriptor extends AbstractScalarFunctionDynamicDescriptor
                         eval1.evaluate(tuple);
 
                         try {
-                            if (argOut0.getByteArray()[0] == SER_NULL_TYPE_TAG
-                                    || argOut1.getByteArray()[0] == SER_NULL_TYPE_TAG) {
+                            if (argOut0.getByteArray()[0] == ATypeTag.SERIALIZED_NULL_TYPE_TAG
+                                    || argOut1.getByteArray()[0] == ATypeTag.SERIALIZED_NULL_TYPE_TAG) {
                                 nullSerde.serialize(ANull.NULL, out);
                                 return;
                             }
 
-                            if (argOut0.getByteArray()[0] != SER_TIME_TYPE_TAG
-                                    || argOut1.getByteArray()[0] != SER_STRING_TYPE_TAG) {
-                                throw new AlgebricksException(getIdentifier().getName()
-                                        + ": expects (TIME, STRING) but got  ("
-                                        + EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(argOut0.getByteArray()[0])
-                                        + ", "
-                                        + EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(argOut1.getByteArray()[0])
-                                        + ")");
+                            if (argOut0.getByteArray()[0] != ATypeTag.SERIALIZED_TIME_TYPE_TAG
+                                    || argOut1.getByteArray()[0] != ATypeTag.SERIALIZED_STRING_TYPE_TAG) {
+                                throw new AlgebricksException(
+                                        getIdentifier().getName() + ": expects (TIME, STRING) but got  ("
+                                                + EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(
+                                                        argOut0.getByteArray()[0])
+                                                + ", " + EnumDeserializer.ATYPETAGDESERIALIZER
+                                                        .deserialize(argOut1.getByteArray()[0])
+                                                + ")");
                             }
 
                             long chronon = ATimeSerializerDeserializer.getChronon(argOut0.getByteArray(), 1);
@@ -117,7 +113,7 @@ public class PrintTimeDescriptor extends AbstractScalarFunctionDynamicDescriptor
                             DT_UTILS.printDateTime(chronon, 0, utf8Ptr.getByteArray(), utf8Ptr.getCharStartOffset(),
                                     formatLength, sbder, DateTimeParseMode.TIME_ONLY);
 
-                            out.writeByte(ATypeTag.STRING.serialize());
+                            out.writeByte(ATypeTag.SERIALIZED_STRING_TYPE_TAG);
                             writer.writeUTF8(sbder.toString(), out);
 
                         } catch (IOException ex) {

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/5dc958ce/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/temporal/TimeFromDatetimeDescriptor.java
----------------------------------------------------------------------
diff --git a/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/temporal/TimeFromDatetimeDescriptor.java b/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/temporal/TimeFromDatetimeDescriptor.java
index 741eeb1..384cfd7 100644
--- a/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/temporal/TimeFromDatetimeDescriptor.java
+++ b/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/temporal/TimeFromDatetimeDescriptor.java
@@ -47,11 +47,6 @@ public class TimeFromDatetimeDescriptor extends AbstractScalarFunctionDynamicDes
 
     private static final long serialVersionUID = 1L;
     public final static FunctionIdentifier FID = AsterixBuiltinFunctions.TIME_FROM_DATETIME;
-
-    // allowed input types
-    private final static byte SER_NULL_TYPE_TAG = ATypeTag.NULL.serialize();
-    private final static byte SER_DATETIME_TYPE_TAG = ATypeTag.DATETIME.serialize();
-
     public final static IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
 
         @Override
@@ -93,18 +88,17 @@ public class TimeFromDatetimeDescriptor extends AbstractScalarFunctionDynamicDes
                         argOut.reset();
                         eval.evaluate(tuple);
                         try {
-                            if (argOut.getByteArray()[0] == SER_NULL_TYPE_TAG) {
+                            if (argOut.getByteArray()[0] == ATypeTag.SERIALIZED_NULL_TYPE_TAG) {
                                 nullSerde.serialize(ANull.NULL, out);
                             } else {
-                                if (argOut.getByteArray()[0] != SER_DATETIME_TYPE_TAG) {
+                                if (argOut.getByteArray()[0] != ATypeTag.SERIALIZED_DATETIME_TYPE_TAG) {
                                     throw new AlgebricksException(
-                                            FID.getName()
-                                                    + ": expects input type DATETIME/NULL but got "
-                                                    + EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(argOut
-                                                            .getByteArray()[0]));
+                                            FID.getName() + ": expects input type DATETIME/NULL but got "
+                                                    + EnumDeserializer.ATYPETAGDESERIALIZER
+                                                            .deserialize(argOut.getByteArray()[0]));
                                 }
-                                long datetimeChronon = ADateTimeSerializerDeserializer.getChronon(
-                                        argOut.getByteArray(), 1);
+                                long datetimeChronon = ADateTimeSerializerDeserializer.getChronon(argOut.getByteArray(),
+                                        1);
                                 int timeChronon = (int) (datetimeChronon % GregorianCalendarSystem.CHRONON_OF_DAY);
                                 if (timeChronon < 0) {
                                     timeChronon += GregorianCalendarSystem.CHRONON_OF_DAY;

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/5dc958ce/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/temporal/TimeFromUnixTimeInMsDescriptor.java
----------------------------------------------------------------------
diff --git a/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/temporal/TimeFromUnixTimeInMsDescriptor.java b/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/temporal/TimeFromUnixTimeInMsDescriptor.java
index 6170726..3744873 100644
--- a/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/temporal/TimeFromUnixTimeInMsDescriptor.java
+++ b/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/temporal/TimeFromUnixTimeInMsDescriptor.java
@@ -42,13 +42,8 @@ import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
 import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
 
 public class TimeFromUnixTimeInMsDescriptor extends AbstractScalarFunctionDynamicDescriptor {
-
     private final static long serialVersionUID = 1L;
     public final static FunctionIdentifier FID = AsterixBuiltinFunctions.TIME_FROM_UNIX_TIME_IN_MS;
-
-    // allowed input types
-    private final static byte SER_NULL_TYPE_TAG = ATypeTag.NULL.serialize();
-
     public final static IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
 
         @Override
@@ -89,7 +84,7 @@ public class TimeFromUnixTimeInMsDescriptor extends AbstractScalarFunctionDynami
                         argOut.reset();
                         eval.evaluate(tuple);
                         try {
-                            if (argOut.getByteArray()[0] == SER_NULL_TYPE_TAG) {
+                            if (argOut.getByteArray()[0] == ATypeTag.SERIALIZED_NULL_TYPE_TAG) {
                                 nullSerde.serialize(ANull.NULL, out);
                             } else {
                                 aTime.setValue(ATypeHierarchy.getIntegerValue(argOut.getByteArray(), 0));

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/5dc958ce/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/temporal/YearMonthDurationComparatorDecriptor.java
----------------------------------------------------------------------
diff --git a/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/temporal/YearMonthDurationComparatorDecriptor.java b/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/temporal/YearMonthDurationComparatorDecriptor.java
index f23fa79..f75665d 100644
--- a/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/temporal/YearMonthDurationComparatorDecriptor.java
+++ b/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/temporal/YearMonthDurationComparatorDecriptor.java
@@ -42,15 +42,9 @@ import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
 import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
 
 public class YearMonthDurationComparatorDecriptor extends AbstractScalarFunctionDynamicDescriptor {
-
     private final static long serialVersionUID = 1L;
     public final static FunctionIdentifier GREATER_THAN_FID = AsterixBuiltinFunctions.YEAR_MONTH_DURATION_GREATER_THAN;
     public final static FunctionIdentifier LESS_THAN_FID = AsterixBuiltinFunctions.YEAR_MONTH_DURATION_LESS_THAN;
-
-    // allowed input types
-    private final static byte SER_NULL_TYPE_TAG = ATypeTag.NULL.serialize();
-    private final static byte SER_DURATION_TYPE_TAG = ATypeTag.DURATION.serialize();
-
     private final boolean isGreaterThan;
 
     private YearMonthDurationComparatorDecriptor(boolean isGreaterThan) {
@@ -105,29 +99,31 @@ public class YearMonthDurationComparatorDecriptor extends AbstractScalarFunction
                         eval1.evaluate(tuple);
 
                         try {
-                            if (argOut0.getByteArray()[0] == SER_NULL_TYPE_TAG
-                                    || argOut1.getByteArray()[0] == SER_NULL_TYPE_TAG) {
+                            if (argOut0.getByteArray()[0] == ATypeTag.SERIALIZED_NULL_TYPE_TAG
+                                    || argOut1.getByteArray()[0] == ATypeTag.SERIALIZED_NULL_TYPE_TAG) {
                                 nullSerde.serialize(ANull.NULL, out);
                                 return;
                             }
 
-                            if (argOut0.getByteArray()[0] != SER_DURATION_TYPE_TAG
-                                    || argOut1.getByteArray()[0] != SER_DURATION_TYPE_TAG) {
-                                throw new AlgebricksException(getIdentifier().getName()
-                                        + ": expects type NULL/DURATION, NULL/DURATION but got "
-                                        + EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(argOut0.getByteArray()[0])
-                                        + " and "
-                                        + EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(argOut1.getByteArray()[0]));
+                            if (argOut0.getByteArray()[0] != ATypeTag.SERIALIZED_DURATION_TYPE_TAG
+                                    || argOut1.getByteArray()[0] != ATypeTag.SERIALIZED_DURATION_TYPE_TAG) {
+                                throw new AlgebricksException(
+                                        getIdentifier().getName()
+                                                + ": expects type NULL/DURATION, NULL/DURATION but got "
+                                                + EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(
+                                                        argOut0.getByteArray()[0])
+                                                + " and " + EnumDeserializer.ATYPETAGDESERIALIZER
+                                                        .deserialize(argOut1.getByteArray()[0]));
                             }
 
                             if ((ADurationSerializerDeserializer.getDayTime(argOut0.getByteArray(), 1) != 0)
                                     || (ADurationSerializerDeserializer.getDayTime(argOut1.getByteArray(), 1) != 0)) {
-                                throw new AlgebricksException(getIdentifier().getName()
-                                        + ": only year-month durations are allowed.");
+                                throw new AlgebricksException(
+                                        getIdentifier().getName() + ": only year-month durations are allowed.");
                             }
 
-                            if (ADurationSerializerDeserializer.getYearMonth(argOut0.getByteArray(), 1) > ADurationSerializerDeserializer
-                                    .getYearMonth(argOut1.getByteArray(), 1)) {
+                            if (ADurationSerializerDeserializer.getYearMonth(argOut0.getByteArray(),
+                                    1) > ADurationSerializerDeserializer.getYearMonth(argOut1.getByteArray(), 1)) {
                                 boolSerde.serialize(isGreaterThan ? ABoolean.TRUE : ABoolean.FALSE, out);
                             } else {
                                 boolSerde.serialize(isGreaterThan ? ABoolean.FALSE : ABoolean.TRUE, out);

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/5dc958ce/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AsterixLSMInvertedIndexUpsertOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AsterixLSMInvertedIndexUpsertOperatorDescriptor.java b/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AsterixLSMInvertedIndexUpsertOperatorDescriptor.java
new file mode 100644
index 0000000..3db3de2
--- /dev/null
+++ b/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AsterixLSMInvertedIndexUpsertOperatorDescriptor.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.operators;
+
+import org.apache.asterix.common.dataflow.AsterixLSMInvertedIndexInsertDeleteOperatorDescriptor;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import org.apache.hyracks.api.dataflow.value.ITypeTraits;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
+import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
+import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider;
+import org.apache.hyracks.storage.am.common.api.IModificationOperationCallbackFactory;
+import org.apache.hyracks.storage.am.common.api.ITupleFilterFactory;
+import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
+import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
+import org.apache.hyracks.storage.am.lsm.invertedindex.tokenizers.IBinaryTokenizerFactory;
+import org.apache.hyracks.storage.common.IStorageManagerInterface;
+
+public class AsterixLSMInvertedIndexUpsertOperatorDescriptor
+        extends AsterixLSMInvertedIndexInsertDeleteOperatorDescriptor {
+
+    private static final long serialVersionUID = 1L;
+    private final int[] prevFieldPermutation;
+
+    public AsterixLSMInvertedIndexUpsertOperatorDescriptor(IOperatorDescriptorRegistry spec, RecordDescriptor recDesc,
+            IStorageManagerInterface storageManager, IFileSplitProvider fileSplitProvider,
+            IIndexLifecycleManagerProvider lifecycleManagerProvider, ITypeTraits[] tokenTypeTraits,
+            IBinaryComparatorFactory[] tokenComparatorFactories, ITypeTraits[] invListsTypeTraits,
+            IBinaryComparatorFactory[] invListComparatorFactories, IBinaryTokenizerFactory tokenizerFactory,
+            int[] fieldPermutation, IIndexDataflowHelperFactory dataflowHelperFactory,
+            ITupleFilterFactory tupleFilterFactory, IModificationOperationCallbackFactory modificationOpCallbackFactory,
+            String indexName, int[] prevFieldPermutation) {
+        super(spec, recDesc, storageManager, fileSplitProvider, lifecycleManagerProvider, tokenTypeTraits,
+                tokenComparatorFactories, invListsTypeTraits, invListComparatorFactories, tokenizerFactory,
+                fieldPermutation, IndexOperation.UPSERT, dataflowHelperFactory, tupleFilterFactory,
+                modificationOpCallbackFactory, indexName);
+        this.prevFieldPermutation = prevFieldPermutation;
+    }
+
+    @Override
+    public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
+            IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
+        return new AsterixLSMSecondaryUpsertOperatorNodePushable(this, ctx, partition, fieldPermutation,
+                recordDescProvider, prevFieldPermutation);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/5dc958ce/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AsterixLSMPrimaryUpsertOperatorNodePushable.java
----------------------------------------------------------------------
diff --git a/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AsterixLSMPrimaryUpsertOperatorNodePushable.java b/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AsterixLSMPrimaryUpsertOperatorNodePushable.java
new file mode 100644
index 0000000..f35f4d6
--- /dev/null
+++ b/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AsterixLSMPrimaryUpsertOperatorNodePushable.java
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.operators;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
+import org.apache.asterix.common.dataflow.AsterixLSMIndexUtil;
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.om.pointables.nonvisitor.ARecordPointable;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.hyracks.api.comm.VSizeFrame;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.INullWriter;
+import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleReference;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
+import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+import org.apache.hyracks.storage.am.btree.impls.RangePredicate;
+import org.apache.hyracks.storage.am.btree.util.BTreeUtils;
+import org.apache.hyracks.storage.am.common.api.IIndexCursor;
+import org.apache.hyracks.storage.am.common.api.IModificationOperationCallback.Operation;
+import org.apache.hyracks.storage.am.common.dataflow.IIndexOperatorDescriptor;
+import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
+import org.apache.hyracks.storage.am.common.ophelpers.MultiComparator;
+import org.apache.hyracks.storage.am.common.tuples.PermutingFrameTupleReference;
+import org.apache.hyracks.storage.am.lsm.btree.impls.LSMBTree;
+import org.apache.hyracks.storage.am.lsm.common.dataflow.LSMIndexInsertUpdateDeleteOperatorNodePushable;
+import org.apache.hyracks.storage.am.lsm.common.impls.LSMTreeIndexAccessor;
+
+public class AsterixLSMPrimaryUpsertOperatorNodePushable extends LSMIndexInsertUpdateDeleteOperatorNodePushable {
+
+    private PermutingFrameTupleReference key;
+    private MultiComparator keySearchCmp;
+    private ArrayTupleBuilder nullTupleBuilder;
+    private INullWriter nullWriter;
+    private ArrayTupleBuilder tb;
+    private DataOutput dos;
+    private LSMBTree lsmIndex;
+    private RangePredicate searchPred;
+    private IIndexCursor cursor;
+    private ITupleReference prevTuple;
+    private int numOfPrimaryKeys;
+    boolean isFiltered = false;
+    private ArrayTupleReference prevTupleWithFilter = new ArrayTupleReference();
+    private ArrayTupleBuilder prevRecWithPKWithFilterValue;
+    private ARecordType recordType;
+    private int presetFieldIndex = -1;
+    private ARecordPointable recPointable;
+    private DataOutput prevDos;
+
+    public AsterixLSMPrimaryUpsertOperatorNodePushable(IIndexOperatorDescriptor opDesc, IHyracksTaskContext ctx,
+            int partition, int[] fieldPermutation, IRecordDescriptorProvider recordDescProvider, int numOfPrimaryKeys,
+            ARecordType recordType, int filterFieldIndex) throws HyracksDataException {
+        super(opDesc, ctx, partition, fieldPermutation, recordDescProvider, IndexOperation.UPSERT);
+        // initialize nullWriter
+        this.nullWriter = opDesc.getNullWriterFactory().createNullWriter();
+        // The search key should only have the primary index and not use the permutations.
+        this.key = new PermutingFrameTupleReference();
+        int[] searchKeyPermutations = new int[numOfPrimaryKeys];
+        for (int i = 0; i < searchKeyPermutations.length; i++) {
+            searchKeyPermutations[i] = fieldPermutation[i];
+        }
+        key.setFieldPermutation(searchKeyPermutations);
+        this.numOfPrimaryKeys = numOfPrimaryKeys;
+        if (fieldPermutation.length > numOfPrimaryKeys + 1) {
+            isFiltered = true;
+            this.recordType = recordType;
+            this.presetFieldIndex = filterFieldIndex;
+            this.recPointable = (ARecordPointable) ARecordPointable.FACTORY.createPointable();
+            this.prevRecWithPKWithFilterValue = new ArrayTupleBuilder(fieldPermutation.length);
+            this.prevDos = prevRecWithPKWithFilterValue.getDataOutput();
+        }
+    }
+
+    // we have the permutation which has [pk locations, record location, optional:filter-location]
+    // the index -> we don't need anymore data?
+    // we need to use the primary index opTracker and secondary indexes callbacks for insert/delete since the lock would
+    // have been obtained through searchForUpsert operation
+
+    @Override
+    public void open() throws HyracksDataException {
+        RecordDescriptor inputRecDesc = recordDescProvider.getInputRecordDescriptor(opDesc.getActivityId(), 0);
+        accessor = new FrameTupleAccessor(inputRecDesc);
+        writeBuffer = new VSizeFrame(ctx);
+        writer.open();
+        indexHelper.open();
+        lsmIndex = (LSMBTree) indexHelper.getIndexInstance();
+
+        try {
+            nullTupleBuilder = new ArrayTupleBuilder(1);
+            DataOutput out = nullTupleBuilder.getDataOutput();
+            try {
+                nullWriter.writeNull(out);
+            } catch (IOException e) {
+                throw new HyracksDataException(e);
+            }
+            nullTupleBuilder.addFieldEndOffset();
+            searchPred = createSearchPredicate();
+            tb = new ArrayTupleBuilder(recordDesc.getFieldCount());
+            dos = tb.getDataOutput();
+            appender = new FrameTupleAppender(new VSizeFrame(ctx), true);
+            modCallback = opDesc.getModificationOpCallbackFactory().createModificationOperationCallback(
+                    indexHelper.getResourcePath(), indexHelper.getResourceID(), lsmIndex, ctx);
+
+            indexAccessor = lsmIndex.createAccessor(modCallback, opDesc.getSearchOpCallbackFactory()
+                    .createSearchOperationCallback(indexHelper.getResourceID(), ctx));
+            cursor = createCursor();
+            frameTuple = new FrameTupleReference();
+            IAsterixAppRuntimeContext runtimeCtx = (IAsterixAppRuntimeContext) ctx.getJobletContext()
+                    .getApplicationContext().getApplicationObject();
+            AsterixLSMIndexUtil.checkAndSetFirstLSN(lsmIndex, runtimeCtx.getTransactionSubsystem().getLogManager());
+        } catch (Exception e) {
+            indexHelper.close();
+            throw new HyracksDataException(e);
+        }
+    }
+
+    private void resetSearchPredicate(int tupleIndex) {
+        key.reset(accessor, tupleIndex);
+    }
+
+    protected void writeOutput(int tupleIndex) throws Exception {
+        tb.reset();
+        frameTuple.reset(accessor, tupleIndex);
+        for (int i = 0; i < frameTuple.getFieldCount(); i++) {
+            dos.write(frameTuple.getFieldData(i), frameTuple.getFieldStart(i), frameTuple.getFieldLength(i));
+            tb.addFieldEndOffset();
+        }
+        if (prevTuple != null) {
+            dos.write(prevTuple.getFieldData(numOfPrimaryKeys), prevTuple.getFieldStart(numOfPrimaryKeys),
+                    prevTuple.getFieldLength(numOfPrimaryKeys));
+            tb.addFieldEndOffset();
+            // if with filters, append the filter
+            if (isFiltered) {
+                dos.write(prevTuple.getFieldData(numOfPrimaryKeys + 1), prevTuple.getFieldStart(numOfPrimaryKeys + 1),
+                        prevTuple.getFieldLength(numOfPrimaryKeys + 1));
+                tb.addFieldEndOffset();
+            }
+        } else {
+            addNullField();
+            // if with filters, append null
+            if (isFiltered) {
+                addNullField();
+            }
+        }
+        FrameUtils.appendToWriter(writer, appender, tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize());
+    }
+
+    private void addNullField() throws IOException {
+        dos.write(nullTupleBuilder.getByteArray());
+        tb.addFieldEndOffset();
+    }
+
+    //TODO: use tryDelete/tryInsert in order to prevent deadlocks
+    @Override
+    public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+        accessor.reset(buffer);
+        LSMTreeIndexAccessor lsmAccessor = (LSMTreeIndexAccessor) indexAccessor;
+        int tupleCount = accessor.getTupleCount();
+
+        try {
+            for (int i = 0; i < tupleCount; i++) {
+                tuple.reset(accessor, i);
+                resetSearchPredicate(i);
+                cursor.reset();
+                lsmAccessor.search(cursor, searchPred);
+                if (cursor.hasNext()) {
+                    cursor.next();
+                    prevTuple = cursor.getTuple();
+                    cursor.reset();
+                    modCallback.setOp(Operation.DELETE);
+                    if (isFiltered) {
+                        prevTuple = getPrevTupleWithFilter(prevTuple);
+                    }
+                    if (i == 0) {
+                        lsmAccessor.delete(prevTuple);
+                    } else {
+                        lsmAccessor.forceDelete(prevTuple);
+                    }
+                } else {
+                    prevTuple = null;
+                }
+                modCallback.setOp(Operation.INSERT);
+                if (prevTuple == null && i == 0) {
+                    lsmAccessor.insert(tuple);
+                } else {
+                    lsmAccessor.forceInsert(tuple);
+                }
+                writeOutput(i);
+            }
+            if (tupleCount > 0) {
+                // All tuples has to move forward to maintain the correctness of the transaction pipeline
+                appender.write(writer, true);
+            }
+        } catch (Exception e) {
+            e.printStackTrace();
+            throw new HyracksDataException(e);
+        }
+    }
+
+    private ITupleReference getPrevTupleWithFilter(ITupleReference prevTuple) throws IOException, AsterixException {
+        prevRecWithPKWithFilterValue.reset();
+        for (int i = 0; i < prevTuple.getFieldCount(); i++) {
+            prevDos.write(prevTuple.getFieldData(i), prevTuple.getFieldStart(i), prevTuple.getFieldLength(i));
+            prevRecWithPKWithFilterValue.addFieldEndOffset();
+        }
+        recPointable.set(prevTuple.getFieldData(numOfPrimaryKeys), prevTuple.getFieldStart(numOfPrimaryKeys),
+                prevTuple.getFieldLength(numOfPrimaryKeys));
+        // copy the field data from prevTuple
+        prevDos.write(recPointable.getClosedFieldType(recordType, presetFieldIndex).getTypeTag().serialize());
+        prevDos.write(recPointable.getByteArray(), recPointable.getClosedFieldOffset(recordType, presetFieldIndex),
+                recPointable.getClosedFieldSize(recordType, presetFieldIndex));
+        prevRecWithPKWithFilterValue.addFieldEndOffset();
+        // prepare the tuple
+        prevTupleWithFilter.reset(prevRecWithPKWithFilterValue.getFieldEndOffsets(),
+                prevRecWithPKWithFilterValue.getByteArray());
+        return prevTupleWithFilter;
+    }
+
+    private RangePredicate createSearchPredicate() {
+        keySearchCmp = BTreeUtils.getSearchMultiComparator(lsmIndex.getComparatorFactories(), key);
+        return new RangePredicate(key, key, true, true, keySearchCmp, keySearchCmp, null, null);
+    }
+
+    protected IIndexCursor createCursor() {
+        return indexAccessor.createSearchCursor(false);
+    }
+
+    @Override
+    public void close() throws HyracksDataException {
+        try {
+            cursor.close();
+            writer.close();
+        } catch (Exception e) {
+            throw new HyracksDataException(e);
+        } finally {
+            indexHelper.close();
+        }
+    }
+
+    @Override
+    public void fail() throws HyracksDataException {
+        writer.fail();
+    }
+
+    @Override
+    public void flush() throws HyracksDataException {
+        writer.flush();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/5dc958ce/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AsterixLSMSecondaryUpsertOperatorNodePushable.java
----------------------------------------------------------------------
diff --git a/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AsterixLSMSecondaryUpsertOperatorNodePushable.java b/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AsterixLSMSecondaryUpsertOperatorNodePushable.java
new file mode 100644
index 0000000..65dc83f
--- /dev/null
+++ b/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AsterixLSMSecondaryUpsertOperatorNodePushable.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.operators;
+
+import java.nio.ByteBuffer;
+
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
+import org.apache.hyracks.storage.am.common.api.IModificationOperationCallback.Operation;
+import org.apache.hyracks.storage.am.common.dataflow.IIndexOperatorDescriptor;
+import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
+import org.apache.hyracks.storage.am.common.tuples.PermutingFrameTupleReference;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
+import org.apache.hyracks.storage.am.lsm.common.dataflow.LSMIndexInsertUpdateDeleteOperatorNodePushable;
+
+/**
+ * This operator node is used for secondary indexes with upsert operations.
+ * It works in the following way:
+ * For each incoming tuple
+ * -If old secondary keys == new secondary keys
+ * --do nothing
+ * -else
+ * --If old secondary keys are null?
+ * ---do nothing
+ * --else
+ * ---delete old secondary keys
+ * --If new keys are null?
+ * ---do nothing
+ * --else
+ * ---insert new keys
+ */
+public class AsterixLSMSecondaryUpsertOperatorNodePushable extends LSMIndexInsertUpdateDeleteOperatorNodePushable {
+
+    private final PermutingFrameTupleReference prevValueTuple = new PermutingFrameTupleReference();
+    private int numberOfFields;
+    private boolean isNewNull = false;
+    private boolean isPrevValueNull = false;
+
+    public AsterixLSMSecondaryUpsertOperatorNodePushable(IIndexOperatorDescriptor opDesc, IHyracksTaskContext ctx,
+            int partition, int[] fieldPermutation, IRecordDescriptorProvider recordDescProvider,
+            int[] prevValuePermutation) {
+        super(opDesc, ctx, partition, fieldPermutation, recordDescProvider, IndexOperation.UPSERT);
+        this.prevValueTuple.setFieldPermutation(prevValuePermutation);
+        this.numberOfFields = prevValuePermutation.length;
+    }
+
+    public static boolean equals(byte[] a, int aOffset, int aLength, byte[] b, int bOffset, int bLength) {
+        if (a.length != b.length) {
+            return false;
+        }
+        for (int i = 0; i < a.length; i++) {
+            if (a[aOffset + i] != b[bOffset + i]) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    public static boolean equalTuples(PermutingFrameTupleReference t1, PermutingFrameTupleReference t2, int numOfFields)
+            throws HyracksDataException {
+        byte[] t1Data = t1.getFieldData(0);
+        byte[] t2Data = t2.getFieldData(0);
+        for (int i = 0; i < numOfFields; i++) {
+            if (!equals(t1Data, t1.getFieldStart(i), t1.getFieldLength(i), t2Data, t2.getFieldStart(i),
+                    t2.getFieldLength(i))) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    private boolean isNull(PermutingFrameTupleReference t1) {
+        return t1.getFieldData(0)[t1.getFieldStart(0)] == ATypeTag.SERIALIZED_NULL_TYPE_TAG;
+    }
+
+    @Override
+    public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+        accessor.reset(buffer);
+        ILSMIndexAccessor lsmAccessor = (ILSMIndexAccessor) indexAccessor;
+        int tupleCount = accessor.getTupleCount();
+        for (int i = 0; i < tupleCount; i++) {
+            try {
+                // if both previous value and new value are null, then we skip
+                tuple.reset(accessor, i);
+                prevValueTuple.reset(accessor, i);
+                isNewNull = isNull(tuple);
+                isPrevValueNull = isNull(prevValueTuple);
+                if (isNewNull && isPrevValueNull) {
+                    continue;
+                }
+                // At least, one is not null
+                // If they are equal, then we skip
+                if (equalTuples(tuple, prevValueTuple, numberOfFields)) {
+                    continue;
+                }
+                if (!isPrevValueNull) {
+                    // previous is not null, we need to delete previous
+                    modCallback.setOp(Operation.DELETE);
+                    lsmAccessor.forceDelete(prevValueTuple);
+                }
+                if (!isNewNull) {
+                    // new is not null, we need to insert the new value
+                    modCallback.setOp(Operation.INSERT);
+                    lsmAccessor.forceInsert(tuple);
+                }
+
+            } catch (HyracksDataException e) {
+                throw e;
+            } catch (Exception e) {
+                throw new HyracksDataException(e);
+            }
+        }
+        // No partial flushing was necessary. Forward entire frame.
+        writeBuffer.ensureFrameSize(buffer.capacity());
+        FrameUtils.copyAndFlip(buffer, writeBuffer.getBuffer());
+        FrameUtils.flushFrame(writeBuffer.getBuffer(), writer);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/5dc958ce/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AsterixLSMTreeUpsertOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AsterixLSMTreeUpsertOperatorDescriptor.java b/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AsterixLSMTreeUpsertOperatorDescriptor.java
new file mode 100644
index 0000000..803e15d
--- /dev/null
+++ b/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AsterixLSMTreeUpsertOperatorDescriptor.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.operators;
+
+import org.apache.asterix.common.dataflow.AsterixLSMTreeInsertDeleteOperatorDescriptor;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import org.apache.hyracks.api.dataflow.value.INullWriterFactory;
+import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import org.apache.hyracks.api.dataflow.value.ITypeTraits;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
+import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
+import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider;
+import org.apache.hyracks.storage.am.common.api.IModificationOperationCallbackFactory;
+import org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory;
+import org.apache.hyracks.storage.am.common.api.ITupleFilterFactory;
+import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
+import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
+import org.apache.hyracks.storage.common.IStorageManagerInterface;
+
+public class AsterixLSMTreeUpsertOperatorDescriptor extends AsterixLSMTreeInsertDeleteOperatorDescriptor {
+
+    private static final long serialVersionUID = 1L;
+    private final int[] prevValuePermutation;
+    private ARecordType type;
+    private int filterIndex = -1;
+
+    public AsterixLSMTreeUpsertOperatorDescriptor(IOperatorDescriptorRegistry spec, RecordDescriptor recDesc,
+            IStorageManagerInterface storageManager, IIndexLifecycleManagerProvider lifecycleManagerProvider,
+            IFileSplitProvider fileSplitProvider, ITypeTraits[] typeTraits,
+            IBinaryComparatorFactory[] comparatorFactories, int[] bloomFilterKeyFields, int[] fieldPermutation,
+            IIndexDataflowHelperFactory dataflowHelperFactory, ITupleFilterFactory tupleFilterFactory,
+            boolean isPrimary, String indexName, INullWriterFactory nullWriterFactory,
+            IModificationOperationCallbackFactory modificationOpCallbackProvider,
+            ISearchOperationCallbackFactory searchOpCallbackProvider, int[] prevValuePermutation) {
+        super(spec, recDesc, storageManager, lifecycleManagerProvider, fileSplitProvider, typeTraits,
+                comparatorFactories, bloomFilterKeyFields, fieldPermutation, IndexOperation.UPSERT,
+                dataflowHelperFactory, tupleFilterFactory, isPrimary, indexName, nullWriterFactory,
+                modificationOpCallbackProvider, searchOpCallbackProvider);
+        this.prevValuePermutation = prevValuePermutation;
+    }
+
+    @Override
+    public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
+            IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
+        return isPrimary()
+                ? new AsterixLSMPrimaryUpsertOperatorNodePushable(this, ctx, partition, fieldPermutation,
+                        recordDescProvider, comparatorFactories.length, type, filterIndex)
+                : new AsterixLSMSecondaryUpsertOperatorNodePushable(this, ctx, partition, fieldPermutation,
+                        recordDescProvider, prevValuePermutation);
+    }
+
+    public void setType(ARecordType type) {
+        this.type = type;
+    }
+
+    public void setFilterIndex(int filterIndex) {
+        this.filterIndex = filterIndex;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/5dc958ce/asterix-runtime/src/main/java/org/apache/asterix/runtime/unnestingfunctions/std/SubsetCollectionDescriptor.java
----------------------------------------------------------------------
diff --git a/asterix-runtime/src/main/java/org/apache/asterix/runtime/unnestingfunctions/std/SubsetCollectionDescriptor.java b/asterix-runtime/src/main/java/org/apache/asterix/runtime/unnestingfunctions/std/SubsetCollectionDescriptor.java
index bb87cf4..88d9a4e 100644
--- a/asterix-runtime/src/main/java/org/apache/asterix/runtime/unnestingfunctions/std/SubsetCollectionDescriptor.java
+++ b/asterix-runtime/src/main/java/org/apache/asterix/runtime/unnestingfunctions/std/SubsetCollectionDescriptor.java
@@ -47,14 +47,9 @@ import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
 import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
 
 public class SubsetCollectionDescriptor extends AbstractUnnestingFunctionDynamicDescriptor {
-
     private static final long serialVersionUID = 1L;
-
-    private final static byte SER_ORDEREDLIST_TYPE_TAG = ATypeTag.ORDEREDLIST.serialize();
-    private final static byte SER_UNORDEREDLIST_TYPE_TAG = ATypeTag.UNORDEREDLIST.serialize();
-    private final static byte SER_NULL_TYPE_TAG = ATypeTag.NULL.serialize();
-
     public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
+        @Override
         public IFunctionDescriptor createFunctionDescriptor() {
             return new SubsetCollectionDescriptor();
         }
@@ -105,16 +100,17 @@ public class SubsetCollectionDescriptor extends AbstractUnnestingFunctionDynamic
 
                             byte[] serList = inputVal.getByteArray();
 
-                            if (serList[0] == SER_NULL_TYPE_TAG) {
+                            if (serList[0] == ATypeTag.SERIALIZED_NULL_TYPE_TAG) {
                                 nullSerde.serialize(ANull.NULL, out);
                                 return;
                             }
 
-                            if (serList[0] != SER_ORDEREDLIST_TYPE_TAG && serList[0] != SER_UNORDEREDLIST_TYPE_TAG) {
+                            if (serList[0] != ATypeTag.SERIALIZED_ORDEREDLIST_TYPE_TAG
+                                    && serList[0] != ATypeTag.SERIALIZED_UNORDEREDLIST_TYPE_TAG) {
                                 throw new AlgebricksException("Subset-collection is not defined for values of type"
                                         + EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(serList[0]));
                             }
-                            if (serList[0] == SER_ORDEREDLIST_TYPE_TAG)
+                            if (serList[0] == ATypeTag.SERIALIZED_ORDEREDLIST_TYPE_TAG)
                                 numItemsMax = AOrderedListSerializerDeserializer.getNumberOfItems(serList);
                             else
                                 numItemsMax = AUnorderedListSerializerDeserializer.getNumberOfItems(serList);

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/5dc958ce/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/AbstractIndexModificationOperationCallback.java
----------------------------------------------------------------------
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/AbstractIndexModificationOperationCallback.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/AbstractIndexModificationOperationCallback.java
index a07a109..3b5630f 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/AbstractIndexModificationOperationCallback.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/AbstractIndexModificationOperationCallback.java
@@ -26,12 +26,16 @@ import org.apache.asterix.common.transactions.ITransactionContext;
 import org.apache.asterix.common.transactions.ITransactionSubsystem;
 import org.apache.asterix.common.transactions.LogRecord;
 import org.apache.asterix.common.transactions.LogType;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+import org.apache.hyracks.storage.am.common.api.IModificationOperationCallback.Operation;
 import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
 import org.apache.hyracks.storage.am.common.tuples.SimpleTupleWriter;
 
 public abstract class AbstractIndexModificationOperationCallback extends AbstractOperationCallback {
 
+    private static final byte INSERT_OP = (byte) IndexOperation.INSERT.ordinal();
+    private static final byte DELETE_OP = (byte) IndexOperation.DELETE.ordinal();
     protected final long resourceId;
     protected final byte resourceType;
     protected final IndexOperation indexOp;
@@ -72,4 +76,15 @@ public abstract class AbstractIndexModificationOperationCallback extends Abstrac
         logRecord.computeAndSetLogSize();
         txnSubsystem.getLogManager().log(logRecord);
     }
+
+    public void setOp(Operation op) throws HyracksDataException {
+        switch (op) {
+            case DELETE:
+                logRecord.setNewOp(DELETE_OP);
+                break;
+            case INSERT:
+                logRecord.setNewOp(INSERT_OP);
+                break;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/5dc958ce/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/LockThenSearchOperationCallback.java
----------------------------------------------------------------------
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/LockThenSearchOperationCallback.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/LockThenSearchOperationCallback.java
new file mode 100644
index 0000000..49cea94
--- /dev/null
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/LockThenSearchOperationCallback.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.transaction.management.opcallbacks;
+
+import org.apache.asterix.common.exceptions.ACIDException;
+import org.apache.asterix.common.transactions.AbstractOperationCallback;
+import org.apache.asterix.common.transactions.ILockManager;
+import org.apache.asterix.common.transactions.ITransactionContext;
+import org.apache.asterix.transaction.management.service.transaction.TransactionManagementConstants.LockManagerConstants.LockMode;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+import org.apache.hyracks.storage.am.common.api.ISearchOperationCallback;
+
+public class LockThenSearchOperationCallback extends AbstractOperationCallback implements ISearchOperationCallback {
+
+    public LockThenSearchOperationCallback(int datasetId, int[] entityIdFields, ILockManager lockManager,
+            ITransactionContext txnCtx) {
+        super(datasetId, entityIdFields, txnCtx, lockManager);
+    }
+
+    @Override
+    public boolean proceed(ITupleReference tuple) throws HyracksDataException {
+        return true;
+    }
+
+    @Override
+    public void reconcile(ITupleReference tuple) throws HyracksDataException {
+    }
+
+    @Override
+    public void cancel(ITupleReference tuple) throws HyracksDataException {
+    }
+
+    @Override
+    public void complete(ITupleReference tuple) throws HyracksDataException {
+    }
+
+    @Override
+    public void before(ITupleReference tuple) throws HyracksDataException {
+        int pkHash = computePrimaryKeyHashValue(tuple, primaryKeyFields);
+        try {
+            lockManager.lock(datasetId, pkHash, LockMode.X, txnCtx);
+        } catch (ACIDException e) {
+            throw new HyracksDataException(e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/5dc958ce/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/LockThenSearchOperationCallbackFactory.java
----------------------------------------------------------------------
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/LockThenSearchOperationCallbackFactory.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/LockThenSearchOperationCallbackFactory.java
new file mode 100644
index 0000000..6bfb6cd
--- /dev/null
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/LockThenSearchOperationCallbackFactory.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.transaction.management.opcallbacks;
+
+import org.apache.asterix.common.context.ITransactionSubsystemProvider;
+import org.apache.asterix.common.exceptions.ACIDException;
+import org.apache.asterix.common.transactions.AbstractOperationCallbackFactory;
+import org.apache.asterix.common.transactions.ITransactionContext;
+import org.apache.asterix.common.transactions.ITransactionSubsystem;
+import org.apache.asterix.common.transactions.JobId;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.common.api.ISearchOperationCallback;
+import org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory;
+
+public class LockThenSearchOperationCallbackFactory extends AbstractOperationCallbackFactory
+        implements ISearchOperationCallbackFactory {
+
+    private static final long serialVersionUID = 1L;
+
+    public LockThenSearchOperationCallbackFactory(JobId jobId, int datasetId, int[] entityIdFields,
+            ITransactionSubsystemProvider txnSubsystemProvider, byte resourceType) {
+        super(jobId, datasetId, entityIdFields, txnSubsystemProvider, resourceType);
+    }
+
+    @Override
+    public ISearchOperationCallback createSearchOperationCallback(long resourceId, IHyracksTaskContext ctx)
+            throws HyracksDataException {
+        ITransactionSubsystem txnSubsystem = txnSubsystemProvider.getTransactionSubsystem(ctx);
+        try {
+            ITransactionContext txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(jobId, false);
+            return new LockThenSearchOperationCallback(datasetId, primaryKeyFields, txnSubsystem.getLockManager(),
+                    txnCtx);
+        } catch (ACIDException e) {
+            throw new HyracksDataException(e);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/5dc958ce/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexInstantSearchOperationCallback.java
----------------------------------------------------------------------
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexInstantSearchOperationCallback.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexInstantSearchOperationCallback.java
index 944f07e..b2477cd 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexInstantSearchOperationCallback.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexInstantSearchOperationCallback.java
@@ -31,8 +31,8 @@ import org.apache.hyracks.storage.am.common.api.ISearchOperationCallback;
 /**
  * Assumes LSM-BTrees as primary indexes. Implements try/locking and unlocking on primary keys.
  */
-public class PrimaryIndexInstantSearchOperationCallback extends AbstractOperationCallback implements
-        ISearchOperationCallback {
+public class PrimaryIndexInstantSearchOperationCallback extends AbstractOperationCallback
+        implements ISearchOperationCallback {
 
     public PrimaryIndexInstantSearchOperationCallback(int datasetId, int[] entityIdFields, ILockManager lockManager,
             ITransactionContext txnCtx) {
@@ -73,4 +73,8 @@ public class PrimaryIndexInstantSearchOperationCallback extends AbstractOperatio
             throw new HyracksDataException(e);
         }
     }
+
+    @Override
+    public void before(ITupleReference tuple) throws HyracksDataException {
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/5dc958ce/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallback.java
----------------------------------------------------------------------
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallback.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallback.java
index 5d6349d..3c34153 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallback.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallback.java
@@ -33,8 +33,8 @@ import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
  * Assumes LSM-BTrees as primary indexes.
  * Performs locking on primary keys, and also logs before/after images.
  */
-public class PrimaryIndexModificationOperationCallback extends AbstractIndexModificationOperationCallback implements
-        IModificationOperationCallback {
+public class PrimaryIndexModificationOperationCallback extends AbstractIndexModificationOperationCallback
+        implements IModificationOperationCallback {
 
     public PrimaryIndexModificationOperationCallback(int datasetId, int[] primaryKeyFields, ITransactionContext txnCtx,
             ILockManager lockManager, ITransactionSubsystem txnSubsystem, long resourceId, byte resourceType,

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/5dc958ce/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexSearchOperationCallback.java
----------------------------------------------------------------------
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexSearchOperationCallback.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexSearchOperationCallback.java
index 2d22879..9532f9e 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexSearchOperationCallback.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexSearchOperationCallback.java
@@ -39,6 +39,11 @@ public class PrimaryIndexSearchOperationCallback extends AbstractOperationCallba
     }
 
     @Override
+    public void before(ITupleReference tuple) throws HyracksDataException {
+        //no op
+    }
+
+    @Override
     public boolean proceed(ITupleReference tuple) throws HyracksDataException {
         try {
             return lockManager.tryLock(datasetId, -1, LockMode.S, txnCtx);

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/5dc958ce/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallback.java
----------------------------------------------------------------------
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallback.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallback.java
index cd0c41f..250e28d 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallback.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallback.java
@@ -33,8 +33,8 @@ import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
  * We assume that the modification of the corresponding primary index has already taken an appropriate lock.
  * This callback performs logging of the before and/or after images for secondary indexes.
  */
-public class SecondaryIndexModificationOperationCallback extends AbstractIndexModificationOperationCallback implements
-        IModificationOperationCallback {
+public class SecondaryIndexModificationOperationCallback extends AbstractIndexModificationOperationCallback
+        implements IModificationOperationCallback {
 
     protected final IndexOperation oldOp;
 

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/5dc958ce/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexSearchOperationCallback.java
----------------------------------------------------------------------
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexSearchOperationCallback.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexSearchOperationCallback.java
index 59924d3..ac5f4d4 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexSearchOperationCallback.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexSearchOperationCallback.java
@@ -27,14 +27,19 @@ import org.apache.hyracks.storage.am.common.api.ISearchOperationCallback;
 /**
  * Secondary index searches perform no locking at all.
  */
-public class SecondaryIndexSearchOperationCallback extends AbstractOperationCallback implements
-        ISearchOperationCallback {
+public class SecondaryIndexSearchOperationCallback extends AbstractOperationCallback
+        implements ISearchOperationCallback {
 
     public SecondaryIndexSearchOperationCallback() {
         super(-1, null, null, null);
     }
 
     @Override
+    public void before(ITupleReference tuple) throws HyracksDataException {
+        // Do nothing
+    }
+
+    @Override
     public boolean proceed(ITupleReference tuple) throws HyracksDataException {
         return true;
     }
@@ -53,5 +58,4 @@ public class SecondaryIndexSearchOperationCallback extends AbstractOperationCall
     public void complete(ITupleReference tuple) throws HyracksDataException {
         // Do nothing.
     }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/5dc958ce/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetIndexModificationOperationCallback.java
----------------------------------------------------------------------
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetIndexModificationOperationCallback.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetIndexModificationOperationCallback.java
index b27daea..69aad24 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetIndexModificationOperationCallback.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetIndexModificationOperationCallback.java
@@ -34,8 +34,8 @@ import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
  * The "before" and "found" method in this callback is empty so that no locking is requested for accessing a temporary
  * dataset and no write-ahead log is written for update operations.
  */
-public class TempDatasetIndexModificationOperationCallback extends AbstractIndexModificationOperationCallback implements
-        IModificationOperationCallback {
+public class TempDatasetIndexModificationOperationCallback extends AbstractIndexModificationOperationCallback
+        implements IModificationOperationCallback {
 
     public TempDatasetIndexModificationOperationCallback(int datasetId, int[] primaryKeyFields,
             ITransactionContext txnCtx, ILockManager lockManager, ITransactionSubsystem txnSubsystem, long resourceId,