You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by am...@apache.org on 2016/02/01 09:31:21 UTC
[02/13] incubator-asterixdb git commit: Add Support for Upsert
Operation
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/5dc958ce/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/temporal/ParseDateDescriptor.java
----------------------------------------------------------------------
diff --git a/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/temporal/ParseDateDescriptor.java b/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/temporal/ParseDateDescriptor.java
index b1fbf7c..d4c93ff 100644
--- a/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/temporal/ParseDateDescriptor.java
+++ b/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/temporal/ParseDateDescriptor.java
@@ -51,14 +51,9 @@ import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
* Multiple format strings can be used by separating them using <b>|(bar)</b>, and the parsing will be successful only when the format string has the <b>exact</b> match with the given data string. This means that a time string like <it>08:23:12 AM</it> will not be valid for the format string <it>h:m:s</it> as there is no AM/PM format character in the format string.
*/
public class ParseDateDescriptor extends AbstractScalarFunctionDynamicDescriptor {
-
private static final long serialVersionUID = 1L;
public final static FunctionIdentifier FID = AsterixBuiltinFunctions.PARSE_DATE;
-
- private final static byte SER_STRING_TYPE_TAG = ATypeTag.STRING.serialize();
- private final static byte SER_NULL_TYPE_TAG = ATypeTag.NULL.serialize();
private final static DateTimeFormatUtils DT_UTILS = DateTimeFormatUtils.getInstance();
-
public final static IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
@Override
@@ -101,20 +96,21 @@ public class ParseDateDescriptor extends AbstractScalarFunctionDynamicDescriptor
eval1.evaluate(tuple);
try {
- if (argOut0.getByteArray()[0] == SER_NULL_TYPE_TAG
- || argOut1.getByteArray()[0] == SER_NULL_TYPE_TAG) {
+ if (argOut0.getByteArray()[0] == ATypeTag.SERIALIZED_NULL_TYPE_TAG
+ || argOut1.getByteArray()[0] == ATypeTag.SERIALIZED_NULL_TYPE_TAG) {
nullSerde.serialize(ANull.NULL, out);
return;
}
- if (argOut0.getByteArray()[0] != SER_STRING_TYPE_TAG
- || argOut1.getByteArray()[0] != SER_STRING_TYPE_TAG) {
- throw new AlgebricksException(getIdentifier().getName()
- + ": expects two strings but got ("
- + EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(argOut0.getByteArray()[0])
- + ", "
- + EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(argOut1.getByteArray()[0])
- + ")");
+ if (argOut0.getByteArray()[0] != ATypeTag.SERIALIZED_STRING_TYPE_TAG
+ || argOut1.getByteArray()[0] != ATypeTag.SERIALIZED_STRING_TYPE_TAG) {
+ throw new AlgebricksException(
+ getIdentifier().getName() + ": expects two strings but got ("
+ + EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(
+ argOut0.getByteArray()[0])
+ + ", " + EnumDeserializer.ATYPETAGDESERIALIZER
+ .deserialize(argOut1.getByteArray()[0])
+ + ")");
}
utf8Ptr.set(argOut0.getByteArray(), 1, argOut0.getLength() - 1);
int start0 = utf8Ptr.getCharStartOffset();
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/5dc958ce/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/temporal/ParseDateTimeDescriptor.java
----------------------------------------------------------------------
diff --git a/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/temporal/ParseDateTimeDescriptor.java b/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/temporal/ParseDateTimeDescriptor.java
index 2a7be96..bdd0783 100644
--- a/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/temporal/ParseDateTimeDescriptor.java
+++ b/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/temporal/ParseDateTimeDescriptor.java
@@ -46,14 +46,9 @@ import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
public class ParseDateTimeDescriptor extends AbstractScalarFunctionDynamicDescriptor {
-
private static final long serialVersionUID = 1L;
public final static FunctionIdentifier FID = AsterixBuiltinFunctions.PARSE_DATETIME;
-
- private final static byte SER_STRING_TYPE_TAG = ATypeTag.STRING.serialize();
- private final static byte SER_NULL_TYPE_TAG = ATypeTag.NULL.serialize();
private final static DateTimeFormatUtils DT_UTILS = DateTimeFormatUtils.getInstance();
-
public final static IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
@Override
@@ -96,26 +91,27 @@ public class ParseDateTimeDescriptor extends AbstractScalarFunctionDynamicDescri
eval1.evaluate(tuple);
try {
- if (argOut0.getByteArray()[0] == SER_NULL_TYPE_TAG
- || argOut1.getByteArray()[0] == SER_NULL_TYPE_TAG) {
+ if (argOut0.getByteArray()[0] == ATypeTag.SERIALIZED_NULL_TYPE_TAG
+ || argOut1.getByteArray()[0] == ATypeTag.SERIALIZED_NULL_TYPE_TAG) {
nullSerde.serialize(ANull.NULL, out);
return;
}
- if (argOut0.getByteArray()[0] != SER_STRING_TYPE_TAG
- || argOut1.getByteArray()[0] != SER_STRING_TYPE_TAG) {
- throw new AlgebricksException(getIdentifier().getName()
- + ": expects two strings but got ("
- + EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(argOut0.getByteArray()[0])
- + ", "
- + EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(argOut1.getByteArray()[0])
- + ")");
+ if (argOut0.getByteArray()[0] != ATypeTag.SERIALIZED_STRING_TYPE_TAG
+ || argOut1.getByteArray()[0] != ATypeTag.SERIALIZED_STRING_TYPE_TAG) {
+ throw new AlgebricksException(
+ getIdentifier().getName() + ": expects two strings but got ("
+ + EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(
+ argOut0.getByteArray()[0])
+ + ", " + EnumDeserializer.ATYPETAGDESERIALIZER
+ .deserialize(argOut1.getByteArray()[0])
+ + ")");
}
- utf8Ptr.set(argOut0.getByteArray(), 1, argOut0.getLength()-1);
+ utf8Ptr.set(argOut0.getByteArray(), 1, argOut0.getLength() - 1);
int start0 = utf8Ptr.getCharStartOffset();
int length0 = utf8Ptr.getUTF8Length();
- utf8Ptr.set(argOut1.getByteArray(), 1, argOut1.getLength()-1);
+ utf8Ptr.set(argOut1.getByteArray(), 1, argOut1.getLength() - 1);
int start1 = utf8Ptr.getCharStartOffset();
int length1 = utf8Ptr.getUTF8Length();
long chronon = 0;
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/5dc958ce/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/temporal/ParseTimeDescriptor.java
----------------------------------------------------------------------
diff --git a/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/temporal/ParseTimeDescriptor.java b/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/temporal/ParseTimeDescriptor.java
index 007ddca..db6e8b3 100644
--- a/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/temporal/ParseTimeDescriptor.java
+++ b/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/temporal/ParseTimeDescriptor.java
@@ -46,12 +46,8 @@ import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
public class ParseTimeDescriptor extends AbstractScalarFunctionDynamicDescriptor {
-
private static final long serialVersionUID = 1L;
public final static FunctionIdentifier FID = AsterixBuiltinFunctions.PARSE_TIME;
-
- private final static byte SER_STRING_TYPE_TAG = ATypeTag.STRING.serialize();
- private final static byte SER_NULL_TYPE_TAG = ATypeTag.NULL.serialize();
private final static DateTimeFormatUtils DT_UTILS = DateTimeFormatUtils.getInstance();
public final static IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
@@ -96,26 +92,27 @@ public class ParseTimeDescriptor extends AbstractScalarFunctionDynamicDescriptor
eval1.evaluate(tuple);
try {
- if (argOut0.getByteArray()[0] == SER_NULL_TYPE_TAG
- || argOut1.getByteArray()[0] == SER_NULL_TYPE_TAG) {
+ if (argOut0.getByteArray()[0] == ATypeTag.SERIALIZED_NULL_TYPE_TAG
+ || argOut1.getByteArray()[0] == ATypeTag.SERIALIZED_NULL_TYPE_TAG) {
nullSerde.serialize(ANull.NULL, out);
return;
}
- if (argOut0.getByteArray()[0] != SER_STRING_TYPE_TAG
- || argOut1.getByteArray()[0] != SER_STRING_TYPE_TAG) {
- throw new AlgebricksException(getIdentifier().getName()
- + ": expects two strings but got ("
- + EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(argOut0.getByteArray()[0])
- + ", "
- + EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(argOut1.getByteArray()[0])
- + ")");
+ if (argOut0.getByteArray()[0] != ATypeTag.SERIALIZED_STRING_TYPE_TAG
+ || argOut1.getByteArray()[0] != ATypeTag.SERIALIZED_STRING_TYPE_TAG) {
+ throw new AlgebricksException(
+ getIdentifier().getName() + ": expects two strings but got ("
+ + EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(
+ argOut0.getByteArray()[0])
+ + ", " + EnumDeserializer.ATYPETAGDESERIALIZER
+ .deserialize(argOut1.getByteArray()[0])
+ + ")");
}
- utf8Ptr.set(argOut0.getByteArray(), 1, argOut0.getLength()-1);
+ utf8Ptr.set(argOut0.getByteArray(), 1, argOut0.getLength() - 1);
int start0 = utf8Ptr.getCharStartOffset();
int length0 = utf8Ptr.getUTF8Length();
- utf8Ptr.set(argOut1.getByteArray(), 1, argOut1.getLength() -1);
+ utf8Ptr.set(argOut1.getByteArray(), 1, argOut1.getLength() - 1);
int start1 = utf8Ptr.getCharStartOffset();
int length1 = utf8Ptr.getUTF8Length();
long chronon = 0;
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/5dc958ce/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/temporal/PrintDateDescriptor.java
----------------------------------------------------------------------
diff --git a/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/temporal/PrintDateDescriptor.java b/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/temporal/PrintDateDescriptor.java
index d731fff..d15534d 100644
--- a/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/temporal/PrintDateDescriptor.java
+++ b/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/temporal/PrintDateDescriptor.java
@@ -46,13 +46,8 @@ import org.apache.hyracks.util.string.UTF8StringUtil;
import org.apache.hyracks.util.string.UTF8StringWriter;
public class PrintDateDescriptor extends AbstractScalarFunctionDynamicDescriptor {
-
private static final long serialVersionUID = 1L;
public final static FunctionIdentifier FID = AsterixBuiltinFunctions.PRINT_DATE;
-
- private final static byte SER_DATE_TYPE_TAG = ATypeTag.DATE.serialize();
- private final static byte SER_STRING_TYPE_TAG = ATypeTag.STRING.serialize();
- private final static byte SER_NULL_TYPE_TAG = ATypeTag.NULL.serialize();
private final static DateTimeFormatUtils DT_UTILS = DateTimeFormatUtils.getInstance();
public final static IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
@@ -94,20 +89,21 @@ public class PrintDateDescriptor extends AbstractScalarFunctionDynamicDescriptor
eval1.evaluate(tuple);
try {
- if (argOut0.getByteArray()[0] == SER_NULL_TYPE_TAG
- || argOut1.getByteArray()[0] == SER_NULL_TYPE_TAG) {
+ if (argOut0.getByteArray()[0] == ATypeTag.SERIALIZED_NULL_TYPE_TAG
+ || argOut1.getByteArray()[0] == ATypeTag.SERIALIZED_NULL_TYPE_TAG) {
nullSerde.serialize(ANull.NULL, out);
return;
}
- if (argOut0.getByteArray()[0] != SER_DATE_TYPE_TAG
- || argOut1.getByteArray()[0] != SER_STRING_TYPE_TAG) {
- throw new AlgebricksException(getIdentifier().getName()
- + ": expects (DATE, STRING) but got ("
- + EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(argOut0.getByteArray()[0])
- + ", "
- + EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(argOut1.getByteArray()[0])
- + ")");
+ if (argOut0.getByteArray()[0] != ATypeTag.SERIALIZED_DATE_TYPE_TAG
+ || argOut1.getByteArray()[0] != ATypeTag.SERIALIZED_STRING_TYPE_TAG) {
+ throw new AlgebricksException(
+ getIdentifier().getName() + ": expects (DATE, STRING) but got ("
+ + EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(
+ argOut0.getByteArray()[0])
+ + ", " + EnumDeserializer.ATYPETAGDESERIALIZER
+ .deserialize(argOut1.getByteArray()[0])
+ + ")");
}
long chronon = ADateSerializerDeserializer.getChronon(argOut0.getByteArray(), 1)
@@ -118,7 +114,7 @@ public class PrintDateDescriptor extends AbstractScalarFunctionDynamicDescriptor
DT_UTILS.printDateTime(chronon, 0, argOut1.getByteArray(), 1 + offset, formatLength, sbder,
DateTimeParseMode.DATE_ONLY);
- out.writeByte(ATypeTag.STRING.serialize());
+ out.writeByte(ATypeTag.SERIALIZED_STRING_TYPE_TAG);
utf8Writer.writeUTF8(sbder.toString(), out);
} catch (IOException ex) {
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/5dc958ce/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/temporal/PrintDateTimeDescriptor.java
----------------------------------------------------------------------
diff --git a/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/temporal/PrintDateTimeDescriptor.java b/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/temporal/PrintDateTimeDescriptor.java
index a31e1e5..ba35a42 100644
--- a/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/temporal/PrintDateTimeDescriptor.java
+++ b/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/temporal/PrintDateTimeDescriptor.java
@@ -45,13 +45,8 @@ import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
import org.apache.hyracks.util.string.UTF8StringWriter;
public class PrintDateTimeDescriptor extends AbstractScalarFunctionDynamicDescriptor {
-
private static final long serialVersionUID = 1L;
public final static FunctionIdentifier FID = AsterixBuiltinFunctions.PRINT_DATETIME;
-
- private final static byte SER_DATETIME_TYPE_TAG = ATypeTag.DATETIME.serialize();
- private final static byte SER_STRING_TYPE_TAG = ATypeTag.STRING.serialize();
- private final static byte SER_NULL_TYPE_TAG = ATypeTag.NULL.serialize();
private final static DateTimeFormatUtils DT_UTILS = DateTimeFormatUtils.getInstance();
public final static IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
@@ -94,20 +89,21 @@ public class PrintDateTimeDescriptor extends AbstractScalarFunctionDynamicDescri
eval1.evaluate(tuple);
try {
- if (argOut0.getByteArray()[0] == SER_NULL_TYPE_TAG
- || argOut1.getByteArray()[0] == SER_NULL_TYPE_TAG) {
+ if (argOut0.getByteArray()[0] == ATypeTag.SERIALIZED_NULL_TYPE_TAG
+ || argOut1.getByteArray()[0] == ATypeTag.SERIALIZED_NULL_TYPE_TAG) {
nullSerde.serialize(ANull.NULL, out);
return;
}
- if (argOut0.getByteArray()[0] != SER_DATETIME_TYPE_TAG
- || argOut1.getByteArray()[0] != SER_STRING_TYPE_TAG) {
- throw new AlgebricksException(getIdentifier().getName()
- + ": expects (DATETIME, STRING) but got ("
- + EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(argOut0.getByteArray()[0])
- + ", "
- + EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(argOut1.getByteArray()[0])
- + ")");
+ if (argOut0.getByteArray()[0] != ATypeTag.SERIALIZED_DATETIME_TYPE_TAG
+ || argOut1.getByteArray()[0] != ATypeTag.SERIALIZED_STRING_TYPE_TAG) {
+ throw new AlgebricksException(
+ getIdentifier().getName() + ": expects (DATETIME, STRING) but got ("
+ + EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(
+ argOut0.getByteArray()[0])
+ + ", " + EnumDeserializer.ATYPETAGDESERIALIZER
+ .deserialize(argOut1.getByteArray()[0])
+ + ")");
}
long chronon = ADateTimeSerializerDeserializer.getChronon(argOut0.getByteArray(), 1);
utf8Ptr.set(argOut1.getByteArray(), 1, argOut1.getLength() - 1);
@@ -116,7 +112,7 @@ public class PrintDateTimeDescriptor extends AbstractScalarFunctionDynamicDescri
DT_UTILS.printDateTime(chronon, 0, utf8Ptr.getByteArray(), utf8Ptr.getCharStartOffset(),
formatLength, sbder, DateTimeParseMode.DATETIME);
- out.writeByte(ATypeTag.STRING.serialize());
+ out.writeByte(ATypeTag.SERIALIZED_STRING_TYPE_TAG);
utf8Writer.writeUTF8(sbder.toString(), out);
} catch (IOException ex) {
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/5dc958ce/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/temporal/PrintTimeDescriptor.java
----------------------------------------------------------------------
diff --git a/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/temporal/PrintTimeDescriptor.java b/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/temporal/PrintTimeDescriptor.java
index 961c44d..f152401 100644
--- a/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/temporal/PrintTimeDescriptor.java
+++ b/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/temporal/PrintTimeDescriptor.java
@@ -45,13 +45,8 @@ import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
import org.apache.hyracks.util.string.UTF8StringWriter;
public class PrintTimeDescriptor extends AbstractScalarFunctionDynamicDescriptor {
-
private static final long serialVersionUID = 1L;
public final static FunctionIdentifier FID = AsterixBuiltinFunctions.PRINT_TIME;
-
- private final static byte SER_TIME_TYPE_TAG = ATypeTag.TIME.serialize();
- private final static byte SER_STRING_TYPE_TAG = ATypeTag.STRING.serialize();
- private final static byte SER_NULL_TYPE_TAG = ATypeTag.NULL.serialize();
private final static DateTimeFormatUtils DT_UTILS = DateTimeFormatUtils.getInstance();
public final static IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
@@ -94,20 +89,21 @@ public class PrintTimeDescriptor extends AbstractScalarFunctionDynamicDescriptor
eval1.evaluate(tuple);
try {
- if (argOut0.getByteArray()[0] == SER_NULL_TYPE_TAG
- || argOut1.getByteArray()[0] == SER_NULL_TYPE_TAG) {
+ if (argOut0.getByteArray()[0] == ATypeTag.SERIALIZED_NULL_TYPE_TAG
+ || argOut1.getByteArray()[0] == ATypeTag.SERIALIZED_NULL_TYPE_TAG) {
nullSerde.serialize(ANull.NULL, out);
return;
}
- if (argOut0.getByteArray()[0] != SER_TIME_TYPE_TAG
- || argOut1.getByteArray()[0] != SER_STRING_TYPE_TAG) {
- throw new AlgebricksException(getIdentifier().getName()
- + ": expects (TIME, STRING) but got ("
- + EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(argOut0.getByteArray()[0])
- + ", "
- + EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(argOut1.getByteArray()[0])
- + ")");
+ if (argOut0.getByteArray()[0] != ATypeTag.SERIALIZED_TIME_TYPE_TAG
+ || argOut1.getByteArray()[0] != ATypeTag.SERIALIZED_STRING_TYPE_TAG) {
+ throw new AlgebricksException(
+ getIdentifier().getName() + ": expects (TIME, STRING) but got ("
+ + EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(
+ argOut0.getByteArray()[0])
+ + ", " + EnumDeserializer.ATYPETAGDESERIALIZER
+ .deserialize(argOut1.getByteArray()[0])
+ + ")");
}
long chronon = ATimeSerializerDeserializer.getChronon(argOut0.getByteArray(), 1);
@@ -117,7 +113,7 @@ public class PrintTimeDescriptor extends AbstractScalarFunctionDynamicDescriptor
DT_UTILS.printDateTime(chronon, 0, utf8Ptr.getByteArray(), utf8Ptr.getCharStartOffset(),
formatLength, sbder, DateTimeParseMode.TIME_ONLY);
- out.writeByte(ATypeTag.STRING.serialize());
+ out.writeByte(ATypeTag.SERIALIZED_STRING_TYPE_TAG);
writer.writeUTF8(sbder.toString(), out);
} catch (IOException ex) {
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/5dc958ce/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/temporal/TimeFromDatetimeDescriptor.java
----------------------------------------------------------------------
diff --git a/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/temporal/TimeFromDatetimeDescriptor.java b/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/temporal/TimeFromDatetimeDescriptor.java
index 741eeb1..384cfd7 100644
--- a/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/temporal/TimeFromDatetimeDescriptor.java
+++ b/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/temporal/TimeFromDatetimeDescriptor.java
@@ -47,11 +47,6 @@ public class TimeFromDatetimeDescriptor extends AbstractScalarFunctionDynamicDes
private static final long serialVersionUID = 1L;
public final static FunctionIdentifier FID = AsterixBuiltinFunctions.TIME_FROM_DATETIME;
-
- // allowed input types
- private final static byte SER_NULL_TYPE_TAG = ATypeTag.NULL.serialize();
- private final static byte SER_DATETIME_TYPE_TAG = ATypeTag.DATETIME.serialize();
-
public final static IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
@Override
@@ -93,18 +88,17 @@ public class TimeFromDatetimeDescriptor extends AbstractScalarFunctionDynamicDes
argOut.reset();
eval.evaluate(tuple);
try {
- if (argOut.getByteArray()[0] == SER_NULL_TYPE_TAG) {
+ if (argOut.getByteArray()[0] == ATypeTag.SERIALIZED_NULL_TYPE_TAG) {
nullSerde.serialize(ANull.NULL, out);
} else {
- if (argOut.getByteArray()[0] != SER_DATETIME_TYPE_TAG) {
+ if (argOut.getByteArray()[0] != ATypeTag.SERIALIZED_DATETIME_TYPE_TAG) {
throw new AlgebricksException(
- FID.getName()
- + ": expects input type DATETIME/NULL but got "
- + EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(argOut
- .getByteArray()[0]));
+ FID.getName() + ": expects input type DATETIME/NULL but got "
+ + EnumDeserializer.ATYPETAGDESERIALIZER
+ .deserialize(argOut.getByteArray()[0]));
}
- long datetimeChronon = ADateTimeSerializerDeserializer.getChronon(
- argOut.getByteArray(), 1);
+ long datetimeChronon = ADateTimeSerializerDeserializer.getChronon(argOut.getByteArray(),
+ 1);
int timeChronon = (int) (datetimeChronon % GregorianCalendarSystem.CHRONON_OF_DAY);
if (timeChronon < 0) {
timeChronon += GregorianCalendarSystem.CHRONON_OF_DAY;
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/5dc958ce/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/temporal/TimeFromUnixTimeInMsDescriptor.java
----------------------------------------------------------------------
diff --git a/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/temporal/TimeFromUnixTimeInMsDescriptor.java b/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/temporal/TimeFromUnixTimeInMsDescriptor.java
index 6170726..3744873 100644
--- a/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/temporal/TimeFromUnixTimeInMsDescriptor.java
+++ b/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/temporal/TimeFromUnixTimeInMsDescriptor.java
@@ -42,13 +42,8 @@ import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
public class TimeFromUnixTimeInMsDescriptor extends AbstractScalarFunctionDynamicDescriptor {
-
private final static long serialVersionUID = 1L;
public final static FunctionIdentifier FID = AsterixBuiltinFunctions.TIME_FROM_UNIX_TIME_IN_MS;
-
- // allowed input types
- private final static byte SER_NULL_TYPE_TAG = ATypeTag.NULL.serialize();
-
public final static IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
@Override
@@ -89,7 +84,7 @@ public class TimeFromUnixTimeInMsDescriptor extends AbstractScalarFunctionDynami
argOut.reset();
eval.evaluate(tuple);
try {
- if (argOut.getByteArray()[0] == SER_NULL_TYPE_TAG) {
+ if (argOut.getByteArray()[0] == ATypeTag.SERIALIZED_NULL_TYPE_TAG) {
nullSerde.serialize(ANull.NULL, out);
} else {
aTime.setValue(ATypeHierarchy.getIntegerValue(argOut.getByteArray(), 0));
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/5dc958ce/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/temporal/YearMonthDurationComparatorDecriptor.java
----------------------------------------------------------------------
diff --git a/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/temporal/YearMonthDurationComparatorDecriptor.java b/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/temporal/YearMonthDurationComparatorDecriptor.java
index f23fa79..f75665d 100644
--- a/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/temporal/YearMonthDurationComparatorDecriptor.java
+++ b/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/temporal/YearMonthDurationComparatorDecriptor.java
@@ -42,15 +42,9 @@ import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
public class YearMonthDurationComparatorDecriptor extends AbstractScalarFunctionDynamicDescriptor {
-
private final static long serialVersionUID = 1L;
public final static FunctionIdentifier GREATER_THAN_FID = AsterixBuiltinFunctions.YEAR_MONTH_DURATION_GREATER_THAN;
public final static FunctionIdentifier LESS_THAN_FID = AsterixBuiltinFunctions.YEAR_MONTH_DURATION_LESS_THAN;
-
- // allowed input types
- private final static byte SER_NULL_TYPE_TAG = ATypeTag.NULL.serialize();
- private final static byte SER_DURATION_TYPE_TAG = ATypeTag.DURATION.serialize();
-
private final boolean isGreaterThan;
private YearMonthDurationComparatorDecriptor(boolean isGreaterThan) {
@@ -105,29 +99,31 @@ public class YearMonthDurationComparatorDecriptor extends AbstractScalarFunction
eval1.evaluate(tuple);
try {
- if (argOut0.getByteArray()[0] == SER_NULL_TYPE_TAG
- || argOut1.getByteArray()[0] == SER_NULL_TYPE_TAG) {
+ if (argOut0.getByteArray()[0] == ATypeTag.SERIALIZED_NULL_TYPE_TAG
+ || argOut1.getByteArray()[0] == ATypeTag.SERIALIZED_NULL_TYPE_TAG) {
nullSerde.serialize(ANull.NULL, out);
return;
}
- if (argOut0.getByteArray()[0] != SER_DURATION_TYPE_TAG
- || argOut1.getByteArray()[0] != SER_DURATION_TYPE_TAG) {
- throw new AlgebricksException(getIdentifier().getName()
- + ": expects type NULL/DURATION, NULL/DURATION but got "
- + EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(argOut0.getByteArray()[0])
- + " and "
- + EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(argOut1.getByteArray()[0]));
+ if (argOut0.getByteArray()[0] != ATypeTag.SERIALIZED_DURATION_TYPE_TAG
+ || argOut1.getByteArray()[0] != ATypeTag.SERIALIZED_DURATION_TYPE_TAG) {
+ throw new AlgebricksException(
+ getIdentifier().getName()
+ + ": expects type NULL/DURATION, NULL/DURATION but got "
+ + EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(
+ argOut0.getByteArray()[0])
+ + " and " + EnumDeserializer.ATYPETAGDESERIALIZER
+ .deserialize(argOut1.getByteArray()[0]));
}
if ((ADurationSerializerDeserializer.getDayTime(argOut0.getByteArray(), 1) != 0)
|| (ADurationSerializerDeserializer.getDayTime(argOut1.getByteArray(), 1) != 0)) {
- throw new AlgebricksException(getIdentifier().getName()
- + ": only year-month durations are allowed.");
+ throw new AlgebricksException(
+ getIdentifier().getName() + ": only year-month durations are allowed.");
}
- if (ADurationSerializerDeserializer.getYearMonth(argOut0.getByteArray(), 1) > ADurationSerializerDeserializer
- .getYearMonth(argOut1.getByteArray(), 1)) {
+ if (ADurationSerializerDeserializer.getYearMonth(argOut0.getByteArray(),
+ 1) > ADurationSerializerDeserializer.getYearMonth(argOut1.getByteArray(), 1)) {
boolSerde.serialize(isGreaterThan ? ABoolean.TRUE : ABoolean.FALSE, out);
} else {
boolSerde.serialize(isGreaterThan ? ABoolean.FALSE : ABoolean.TRUE, out);
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/5dc958ce/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AsterixLSMInvertedIndexUpsertOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AsterixLSMInvertedIndexUpsertOperatorDescriptor.java b/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AsterixLSMInvertedIndexUpsertOperatorDescriptor.java
new file mode 100644
index 0000000..3db3de2
--- /dev/null
+++ b/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AsterixLSMInvertedIndexUpsertOperatorDescriptor.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.operators;
+
+import org.apache.asterix.common.dataflow.AsterixLSMInvertedIndexInsertDeleteOperatorDescriptor;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import org.apache.hyracks.api.dataflow.value.ITypeTraits;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
+import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
+import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider;
+import org.apache.hyracks.storage.am.common.api.IModificationOperationCallbackFactory;
+import org.apache.hyracks.storage.am.common.api.ITupleFilterFactory;
+import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
+import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
+import org.apache.hyracks.storage.am.lsm.invertedindex.tokenizers.IBinaryTokenizerFactory;
+import org.apache.hyracks.storage.common.IStorageManagerInterface;
+
+public class AsterixLSMInvertedIndexUpsertOperatorDescriptor
+ extends AsterixLSMInvertedIndexInsertDeleteOperatorDescriptor {
+
+ private static final long serialVersionUID = 1L;
+ private final int[] prevFieldPermutation;
+
+ public AsterixLSMInvertedIndexUpsertOperatorDescriptor(IOperatorDescriptorRegistry spec, RecordDescriptor recDesc,
+ IStorageManagerInterface storageManager, IFileSplitProvider fileSplitProvider,
+ IIndexLifecycleManagerProvider lifecycleManagerProvider, ITypeTraits[] tokenTypeTraits,
+ IBinaryComparatorFactory[] tokenComparatorFactories, ITypeTraits[] invListsTypeTraits,
+ IBinaryComparatorFactory[] invListComparatorFactories, IBinaryTokenizerFactory tokenizerFactory,
+ int[] fieldPermutation, IIndexDataflowHelperFactory dataflowHelperFactory,
+ ITupleFilterFactory tupleFilterFactory, IModificationOperationCallbackFactory modificationOpCallbackFactory,
+ String indexName, int[] prevFieldPermutation) {
+ super(spec, recDesc, storageManager, fileSplitProvider, lifecycleManagerProvider, tokenTypeTraits,
+ tokenComparatorFactories, invListsTypeTraits, invListComparatorFactories, tokenizerFactory,
+ fieldPermutation, IndexOperation.UPSERT, dataflowHelperFactory, tupleFilterFactory,
+ modificationOpCallbackFactory, indexName);
+ this.prevFieldPermutation = prevFieldPermutation;
+ }
+
+ @Override
+ public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
+ IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
+ return new AsterixLSMSecondaryUpsertOperatorNodePushable(this, ctx, partition, fieldPermutation,
+ recordDescProvider, prevFieldPermutation);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/5dc958ce/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AsterixLSMPrimaryUpsertOperatorNodePushable.java
----------------------------------------------------------------------
diff --git a/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AsterixLSMPrimaryUpsertOperatorNodePushable.java b/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AsterixLSMPrimaryUpsertOperatorNodePushable.java
new file mode 100644
index 0000000..f35f4d6
--- /dev/null
+++ b/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AsterixLSMPrimaryUpsertOperatorNodePushable.java
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.operators;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
+import org.apache.asterix.common.dataflow.AsterixLSMIndexUtil;
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.om.pointables.nonvisitor.ARecordPointable;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.hyracks.api.comm.VSizeFrame;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.INullWriter;
+import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleReference;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
+import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+import org.apache.hyracks.storage.am.btree.impls.RangePredicate;
+import org.apache.hyracks.storage.am.btree.util.BTreeUtils;
+import org.apache.hyracks.storage.am.common.api.IIndexCursor;
+import org.apache.hyracks.storage.am.common.api.IModificationOperationCallback.Operation;
+import org.apache.hyracks.storage.am.common.dataflow.IIndexOperatorDescriptor;
+import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
+import org.apache.hyracks.storage.am.common.ophelpers.MultiComparator;
+import org.apache.hyracks.storage.am.common.tuples.PermutingFrameTupleReference;
+import org.apache.hyracks.storage.am.lsm.btree.impls.LSMBTree;
+import org.apache.hyracks.storage.am.lsm.common.dataflow.LSMIndexInsertUpdateDeleteOperatorNodePushable;
+import org.apache.hyracks.storage.am.lsm.common.impls.LSMTreeIndexAccessor;
+
+public class AsterixLSMPrimaryUpsertOperatorNodePushable extends LSMIndexInsertUpdateDeleteOperatorNodePushable {
+
+ private PermutingFrameTupleReference key;
+ private MultiComparator keySearchCmp;
+ private ArrayTupleBuilder nullTupleBuilder;
+ private INullWriter nullWriter;
+ private ArrayTupleBuilder tb;
+ private DataOutput dos;
+ private LSMBTree lsmIndex;
+ private RangePredicate searchPred;
+ private IIndexCursor cursor;
+ private ITupleReference prevTuple;
+ private int numOfPrimaryKeys;
+ boolean isFiltered = false;
+ private ArrayTupleReference prevTupleWithFilter = new ArrayTupleReference();
+ private ArrayTupleBuilder prevRecWithPKWithFilterValue;
+ private ARecordType recordType;
+ private int presetFieldIndex = -1;
+ private ARecordPointable recPointable;
+ private DataOutput prevDos;
+
+ public AsterixLSMPrimaryUpsertOperatorNodePushable(IIndexOperatorDescriptor opDesc, IHyracksTaskContext ctx,
+ int partition, int[] fieldPermutation, IRecordDescriptorProvider recordDescProvider, int numOfPrimaryKeys,
+ ARecordType recordType, int filterFieldIndex) throws HyracksDataException {
+ super(opDesc, ctx, partition, fieldPermutation, recordDescProvider, IndexOperation.UPSERT);
+ // initialize nullWriter
+ this.nullWriter = opDesc.getNullWriterFactory().createNullWriter();
+ // The search key should only have the primary index and not use the permutations.
+ this.key = new PermutingFrameTupleReference();
+ int[] searchKeyPermutations = new int[numOfPrimaryKeys];
+ for (int i = 0; i < searchKeyPermutations.length; i++) {
+ searchKeyPermutations[i] = fieldPermutation[i];
+ }
+ key.setFieldPermutation(searchKeyPermutations);
+ this.numOfPrimaryKeys = numOfPrimaryKeys;
+ if (fieldPermutation.length > numOfPrimaryKeys + 1) {
+ isFiltered = true;
+ this.recordType = recordType;
+ this.presetFieldIndex = filterFieldIndex;
+ this.recPointable = (ARecordPointable) ARecordPointable.FACTORY.createPointable();
+ this.prevRecWithPKWithFilterValue = new ArrayTupleBuilder(fieldPermutation.length);
+ this.prevDos = prevRecWithPKWithFilterValue.getDataOutput();
+ }
+ }
+
+ // we have the permutation which has [pk locations, record location, optional:filter-location]
+ // the index -> we don't need anymore data?
+ // we need to use the primary index opTracker and secondary indexes callbacks for insert/delete since the lock would
+ // have been obtained through searchForUpsert operation
+
+ @Override
+ public void open() throws HyracksDataException {
+ RecordDescriptor inputRecDesc = recordDescProvider.getInputRecordDescriptor(opDesc.getActivityId(), 0);
+ accessor = new FrameTupleAccessor(inputRecDesc);
+ writeBuffer = new VSizeFrame(ctx);
+ writer.open();
+ indexHelper.open();
+ lsmIndex = (LSMBTree) indexHelper.getIndexInstance();
+
+ try {
+ nullTupleBuilder = new ArrayTupleBuilder(1);
+ DataOutput out = nullTupleBuilder.getDataOutput();
+ try {
+ nullWriter.writeNull(out);
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ nullTupleBuilder.addFieldEndOffset();
+ searchPred = createSearchPredicate();
+ tb = new ArrayTupleBuilder(recordDesc.getFieldCount());
+ dos = tb.getDataOutput();
+ appender = new FrameTupleAppender(new VSizeFrame(ctx), true);
+ modCallback = opDesc.getModificationOpCallbackFactory().createModificationOperationCallback(
+ indexHelper.getResourcePath(), indexHelper.getResourceID(), lsmIndex, ctx);
+
+ indexAccessor = lsmIndex.createAccessor(modCallback, opDesc.getSearchOpCallbackFactory()
+ .createSearchOperationCallback(indexHelper.getResourceID(), ctx));
+ cursor = createCursor();
+ frameTuple = new FrameTupleReference();
+ IAsterixAppRuntimeContext runtimeCtx = (IAsterixAppRuntimeContext) ctx.getJobletContext()
+ .getApplicationContext().getApplicationObject();
+ AsterixLSMIndexUtil.checkAndSetFirstLSN(lsmIndex, runtimeCtx.getTransactionSubsystem().getLogManager());
+ } catch (Exception e) {
+ indexHelper.close();
+ throw new HyracksDataException(e);
+ }
+ }
+
+ private void resetSearchPredicate(int tupleIndex) {
+ key.reset(accessor, tupleIndex);
+ }
+
+ protected void writeOutput(int tupleIndex) throws Exception {
+ tb.reset();
+ frameTuple.reset(accessor, tupleIndex);
+ for (int i = 0; i < frameTuple.getFieldCount(); i++) {
+ dos.write(frameTuple.getFieldData(i), frameTuple.getFieldStart(i), frameTuple.getFieldLength(i));
+ tb.addFieldEndOffset();
+ }
+ if (prevTuple != null) {
+ dos.write(prevTuple.getFieldData(numOfPrimaryKeys), prevTuple.getFieldStart(numOfPrimaryKeys),
+ prevTuple.getFieldLength(numOfPrimaryKeys));
+ tb.addFieldEndOffset();
+ // if with filters, append the filter
+ if (isFiltered) {
+ dos.write(prevTuple.getFieldData(numOfPrimaryKeys + 1), prevTuple.getFieldStart(numOfPrimaryKeys + 1),
+ prevTuple.getFieldLength(numOfPrimaryKeys + 1));
+ tb.addFieldEndOffset();
+ }
+ } else {
+ addNullField();
+ // if with filters, append null
+ if (isFiltered) {
+ addNullField();
+ }
+ }
+ FrameUtils.appendToWriter(writer, appender, tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize());
+ }
+
+ private void addNullField() throws IOException {
+ dos.write(nullTupleBuilder.getByteArray());
+ tb.addFieldEndOffset();
+ }
+
+ //TODO: use tryDelete/tryInsert in order to prevent deadlocks
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ accessor.reset(buffer);
+ LSMTreeIndexAccessor lsmAccessor = (LSMTreeIndexAccessor) indexAccessor;
+ int tupleCount = accessor.getTupleCount();
+
+ try {
+ for (int i = 0; i < tupleCount; i++) {
+ tuple.reset(accessor, i);
+ resetSearchPredicate(i);
+ cursor.reset();
+ lsmAccessor.search(cursor, searchPred);
+ if (cursor.hasNext()) {
+ cursor.next();
+ prevTuple = cursor.getTuple();
+ cursor.reset();
+ modCallback.setOp(Operation.DELETE);
+ if (isFiltered) {
+ prevTuple = getPrevTupleWithFilter(prevTuple);
+ }
+ if (i == 0) {
+ lsmAccessor.delete(prevTuple);
+ } else {
+ lsmAccessor.forceDelete(prevTuple);
+ }
+ } else {
+ prevTuple = null;
+ }
+ modCallback.setOp(Operation.INSERT);
+ if (prevTuple == null && i == 0) {
+ lsmAccessor.insert(tuple);
+ } else {
+ lsmAccessor.forceInsert(tuple);
+ }
+ writeOutput(i);
+ }
+ if (tupleCount > 0) {
+ // All tuples has to move forward to maintain the correctness of the transaction pipeline
+ appender.write(writer, true);
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw new HyracksDataException(e);
+ }
+ }
+
+ private ITupleReference getPrevTupleWithFilter(ITupleReference prevTuple) throws IOException, AsterixException {
+ prevRecWithPKWithFilterValue.reset();
+ for (int i = 0; i < prevTuple.getFieldCount(); i++) {
+ prevDos.write(prevTuple.getFieldData(i), prevTuple.getFieldStart(i), prevTuple.getFieldLength(i));
+ prevRecWithPKWithFilterValue.addFieldEndOffset();
+ }
+ recPointable.set(prevTuple.getFieldData(numOfPrimaryKeys), prevTuple.getFieldStart(numOfPrimaryKeys),
+ prevTuple.getFieldLength(numOfPrimaryKeys));
+ // copy the field data from prevTuple
+ prevDos.write(recPointable.getClosedFieldType(recordType, presetFieldIndex).getTypeTag().serialize());
+ prevDos.write(recPointable.getByteArray(), recPointable.getClosedFieldOffset(recordType, presetFieldIndex),
+ recPointable.getClosedFieldSize(recordType, presetFieldIndex));
+ prevRecWithPKWithFilterValue.addFieldEndOffset();
+ // prepare the tuple
+ prevTupleWithFilter.reset(prevRecWithPKWithFilterValue.getFieldEndOffsets(),
+ prevRecWithPKWithFilterValue.getByteArray());
+ return prevTupleWithFilter;
+ }
+
+ private RangePredicate createSearchPredicate() {
+ keySearchCmp = BTreeUtils.getSearchMultiComparator(lsmIndex.getComparatorFactories(), key);
+ return new RangePredicate(key, key, true, true, keySearchCmp, keySearchCmp, null, null);
+ }
+
+ protected IIndexCursor createCursor() {
+ return indexAccessor.createSearchCursor(false);
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ try {
+ cursor.close();
+ writer.close();
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ } finally {
+ indexHelper.close();
+ }
+ }
+
+ @Override
+ public void fail() throws HyracksDataException {
+ writer.fail();
+ }
+
+ @Override
+ public void flush() throws HyracksDataException {
+ writer.flush();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/5dc958ce/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AsterixLSMSecondaryUpsertOperatorNodePushable.java
----------------------------------------------------------------------
diff --git a/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AsterixLSMSecondaryUpsertOperatorNodePushable.java b/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AsterixLSMSecondaryUpsertOperatorNodePushable.java
new file mode 100644
index 0000000..65dc83f
--- /dev/null
+++ b/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AsterixLSMSecondaryUpsertOperatorNodePushable.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.operators;
+
+import java.nio.ByteBuffer;
+
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
+import org.apache.hyracks.storage.am.common.api.IModificationOperationCallback.Operation;
+import org.apache.hyracks.storage.am.common.dataflow.IIndexOperatorDescriptor;
+import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
+import org.apache.hyracks.storage.am.common.tuples.PermutingFrameTupleReference;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
+import org.apache.hyracks.storage.am.lsm.common.dataflow.LSMIndexInsertUpdateDeleteOperatorNodePushable;
+
+/**
+ * This operator node is used for secondary indexes with upsert operations.
+ * It works in the following way:
+ * For each incoming tuple
+ * -If old secondary keys == new secondary keys
+ * --do nothing
+ * -else
+ * --If old secondary keys are null?
+ * ---do nothing
+ * --else
+ * ---delete old secondary keys
+ * --If new keys are null?
+ * ---do nothing
+ * --else
+ * ---insert new keys
+ */
+public class AsterixLSMSecondaryUpsertOperatorNodePushable extends LSMIndexInsertUpdateDeleteOperatorNodePushable {
+
+ private final PermutingFrameTupleReference prevValueTuple = new PermutingFrameTupleReference();
+ private int numberOfFields;
+ private boolean isNewNull = false;
+ private boolean isPrevValueNull = false;
+
+ public AsterixLSMSecondaryUpsertOperatorNodePushable(IIndexOperatorDescriptor opDesc, IHyracksTaskContext ctx,
+ int partition, int[] fieldPermutation, IRecordDescriptorProvider recordDescProvider,
+ int[] prevValuePermutation) {
+ super(opDesc, ctx, partition, fieldPermutation, recordDescProvider, IndexOperation.UPSERT);
+ this.prevValueTuple.setFieldPermutation(prevValuePermutation);
+ this.numberOfFields = prevValuePermutation.length;
+ }
+
+ public static boolean equals(byte[] a, int aOffset, int aLength, byte[] b, int bOffset, int bLength) {
+ if (a.length != b.length) {
+ return false;
+ }
+ for (int i = 0; i < a.length; i++) {
+ if (a[aOffset + i] != b[bOffset + i]) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ public static boolean equalTuples(PermutingFrameTupleReference t1, PermutingFrameTupleReference t2, int numOfFields)
+ throws HyracksDataException {
+ byte[] t1Data = t1.getFieldData(0);
+ byte[] t2Data = t2.getFieldData(0);
+ for (int i = 0; i < numOfFields; i++) {
+ if (!equals(t1Data, t1.getFieldStart(i), t1.getFieldLength(i), t2Data, t2.getFieldStart(i),
+ t2.getFieldLength(i))) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private boolean isNull(PermutingFrameTupleReference t1) {
+ return t1.getFieldData(0)[t1.getFieldStart(0)] == ATypeTag.SERIALIZED_NULL_TYPE_TAG;
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ accessor.reset(buffer);
+ ILSMIndexAccessor lsmAccessor = (ILSMIndexAccessor) indexAccessor;
+ int tupleCount = accessor.getTupleCount();
+ for (int i = 0; i < tupleCount; i++) {
+ try {
+ // if both previous value and new value are null, then we skip
+ tuple.reset(accessor, i);
+ prevValueTuple.reset(accessor, i);
+ isNewNull = isNull(tuple);
+ isPrevValueNull = isNull(prevValueTuple);
+ if (isNewNull && isPrevValueNull) {
+ continue;
+ }
+ // At least, one is not null
+ // If they are equal, then we skip
+ if (equalTuples(tuple, prevValueTuple, numberOfFields)) {
+ continue;
+ }
+ if (!isPrevValueNull) {
+ // previous is not null, we need to delete previous
+ modCallback.setOp(Operation.DELETE);
+ lsmAccessor.forceDelete(prevValueTuple);
+ }
+ if (!isNewNull) {
+ // new is not null, we need to insert the new value
+ modCallback.setOp(Operation.INSERT);
+ lsmAccessor.forceInsert(tuple);
+ }
+
+ } catch (HyracksDataException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ }
+ // No partial flushing was necessary. Forward entire frame.
+ writeBuffer.ensureFrameSize(buffer.capacity());
+ FrameUtils.copyAndFlip(buffer, writeBuffer.getBuffer());
+ FrameUtils.flushFrame(writeBuffer.getBuffer(), writer);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/5dc958ce/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AsterixLSMTreeUpsertOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AsterixLSMTreeUpsertOperatorDescriptor.java b/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AsterixLSMTreeUpsertOperatorDescriptor.java
new file mode 100644
index 0000000..803e15d
--- /dev/null
+++ b/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AsterixLSMTreeUpsertOperatorDescriptor.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.operators;
+
+import org.apache.asterix.common.dataflow.AsterixLSMTreeInsertDeleteOperatorDescriptor;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import org.apache.hyracks.api.dataflow.value.INullWriterFactory;
+import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import org.apache.hyracks.api.dataflow.value.ITypeTraits;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
+import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
+import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider;
+import org.apache.hyracks.storage.am.common.api.IModificationOperationCallbackFactory;
+import org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory;
+import org.apache.hyracks.storage.am.common.api.ITupleFilterFactory;
+import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
+import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
+import org.apache.hyracks.storage.common.IStorageManagerInterface;
+
+public class AsterixLSMTreeUpsertOperatorDescriptor extends AsterixLSMTreeInsertDeleteOperatorDescriptor {
+
+ private static final long serialVersionUID = 1L;
+ private final int[] prevValuePermutation;
+ private ARecordType type;
+ private int filterIndex = -1;
+
+ public AsterixLSMTreeUpsertOperatorDescriptor(IOperatorDescriptorRegistry spec, RecordDescriptor recDesc,
+ IStorageManagerInterface storageManager, IIndexLifecycleManagerProvider lifecycleManagerProvider,
+ IFileSplitProvider fileSplitProvider, ITypeTraits[] typeTraits,
+ IBinaryComparatorFactory[] comparatorFactories, int[] bloomFilterKeyFields, int[] fieldPermutation,
+ IIndexDataflowHelperFactory dataflowHelperFactory, ITupleFilterFactory tupleFilterFactory,
+ boolean isPrimary, String indexName, INullWriterFactory nullWriterFactory,
+ IModificationOperationCallbackFactory modificationOpCallbackProvider,
+ ISearchOperationCallbackFactory searchOpCallbackProvider, int[] prevValuePermutation) {
+ super(spec, recDesc, storageManager, lifecycleManagerProvider, fileSplitProvider, typeTraits,
+ comparatorFactories, bloomFilterKeyFields, fieldPermutation, IndexOperation.UPSERT,
+ dataflowHelperFactory, tupleFilterFactory, isPrimary, indexName, nullWriterFactory,
+ modificationOpCallbackProvider, searchOpCallbackProvider);
+ this.prevValuePermutation = prevValuePermutation;
+ }
+
+ @Override
+ public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
+ IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
+ return isPrimary()
+ ? new AsterixLSMPrimaryUpsertOperatorNodePushable(this, ctx, partition, fieldPermutation,
+ recordDescProvider, comparatorFactories.length, type, filterIndex)
+ : new AsterixLSMSecondaryUpsertOperatorNodePushable(this, ctx, partition, fieldPermutation,
+ recordDescProvider, prevValuePermutation);
+ }
+
+ public void setType(ARecordType type) {
+ this.type = type;
+ }
+
+ public void setFilterIndex(int filterIndex) {
+ this.filterIndex = filterIndex;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/5dc958ce/asterix-runtime/src/main/java/org/apache/asterix/runtime/unnestingfunctions/std/SubsetCollectionDescriptor.java
----------------------------------------------------------------------
diff --git a/asterix-runtime/src/main/java/org/apache/asterix/runtime/unnestingfunctions/std/SubsetCollectionDescriptor.java b/asterix-runtime/src/main/java/org/apache/asterix/runtime/unnestingfunctions/std/SubsetCollectionDescriptor.java
index bb87cf4..88d9a4e 100644
--- a/asterix-runtime/src/main/java/org/apache/asterix/runtime/unnestingfunctions/std/SubsetCollectionDescriptor.java
+++ b/asterix-runtime/src/main/java/org/apache/asterix/runtime/unnestingfunctions/std/SubsetCollectionDescriptor.java
@@ -47,14 +47,9 @@ import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
public class SubsetCollectionDescriptor extends AbstractUnnestingFunctionDynamicDescriptor {
-
private static final long serialVersionUID = 1L;
-
- private final static byte SER_ORDEREDLIST_TYPE_TAG = ATypeTag.ORDEREDLIST.serialize();
- private final static byte SER_UNORDEREDLIST_TYPE_TAG = ATypeTag.UNORDEREDLIST.serialize();
- private final static byte SER_NULL_TYPE_TAG = ATypeTag.NULL.serialize();
-
public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
+ @Override
public IFunctionDescriptor createFunctionDescriptor() {
return new SubsetCollectionDescriptor();
}
@@ -105,16 +100,17 @@ public class SubsetCollectionDescriptor extends AbstractUnnestingFunctionDynamic
byte[] serList = inputVal.getByteArray();
- if (serList[0] == SER_NULL_TYPE_TAG) {
+ if (serList[0] == ATypeTag.SERIALIZED_NULL_TYPE_TAG) {
nullSerde.serialize(ANull.NULL, out);
return;
}
- if (serList[0] != SER_ORDEREDLIST_TYPE_TAG && serList[0] != SER_UNORDEREDLIST_TYPE_TAG) {
+ if (serList[0] != ATypeTag.SERIALIZED_ORDEREDLIST_TYPE_TAG
+ && serList[0] != ATypeTag.SERIALIZED_UNORDEREDLIST_TYPE_TAG) {
throw new AlgebricksException("Subset-collection is not defined for values of type"
+ EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(serList[0]));
}
- if (serList[0] == SER_ORDEREDLIST_TYPE_TAG)
+ if (serList[0] == ATypeTag.SERIALIZED_ORDEREDLIST_TYPE_TAG)
numItemsMax = AOrderedListSerializerDeserializer.getNumberOfItems(serList);
else
numItemsMax = AUnorderedListSerializerDeserializer.getNumberOfItems(serList);
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/5dc958ce/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/AbstractIndexModificationOperationCallback.java
----------------------------------------------------------------------
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/AbstractIndexModificationOperationCallback.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/AbstractIndexModificationOperationCallback.java
index a07a109..3b5630f 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/AbstractIndexModificationOperationCallback.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/AbstractIndexModificationOperationCallback.java
@@ -26,12 +26,16 @@ import org.apache.asterix.common.transactions.ITransactionContext;
import org.apache.asterix.common.transactions.ITransactionSubsystem;
import org.apache.asterix.common.transactions.LogRecord;
import org.apache.asterix.common.transactions.LogType;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+import org.apache.hyracks.storage.am.common.api.IModificationOperationCallback.Operation;
import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
import org.apache.hyracks.storage.am.common.tuples.SimpleTupleWriter;
public abstract class AbstractIndexModificationOperationCallback extends AbstractOperationCallback {
+ private static final byte INSERT_OP = (byte) IndexOperation.INSERT.ordinal();
+ private static final byte DELETE_OP = (byte) IndexOperation.DELETE.ordinal();
protected final long resourceId;
protected final byte resourceType;
protected final IndexOperation indexOp;
@@ -72,4 +76,15 @@ public abstract class AbstractIndexModificationOperationCallback extends Abstrac
logRecord.computeAndSetLogSize();
txnSubsystem.getLogManager().log(logRecord);
}
+
+ public void setOp(Operation op) throws HyracksDataException {
+ switch (op) {
+ case DELETE:
+ logRecord.setNewOp(DELETE_OP);
+ break;
+ case INSERT:
+ logRecord.setNewOp(INSERT_OP);
+ break;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/5dc958ce/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/LockThenSearchOperationCallback.java
----------------------------------------------------------------------
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/LockThenSearchOperationCallback.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/LockThenSearchOperationCallback.java
new file mode 100644
index 0000000..49cea94
--- /dev/null
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/LockThenSearchOperationCallback.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.transaction.management.opcallbacks;
+
+import org.apache.asterix.common.exceptions.ACIDException;
+import org.apache.asterix.common.transactions.AbstractOperationCallback;
+import org.apache.asterix.common.transactions.ILockManager;
+import org.apache.asterix.common.transactions.ITransactionContext;
+import org.apache.asterix.transaction.management.service.transaction.TransactionManagementConstants.LockManagerConstants.LockMode;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+import org.apache.hyracks.storage.am.common.api.ISearchOperationCallback;
+
+public class LockThenSearchOperationCallback extends AbstractOperationCallback implements ISearchOperationCallback {
+
+ public LockThenSearchOperationCallback(int datasetId, int[] entityIdFields, ILockManager lockManager,
+ ITransactionContext txnCtx) {
+ super(datasetId, entityIdFields, txnCtx, lockManager);
+ }
+
+ @Override
+ public boolean proceed(ITupleReference tuple) throws HyracksDataException {
+ return true;
+ }
+
+ @Override
+ public void reconcile(ITupleReference tuple) throws HyracksDataException {
+ }
+
+ @Override
+ public void cancel(ITupleReference tuple) throws HyracksDataException {
+ }
+
+ @Override
+ public void complete(ITupleReference tuple) throws HyracksDataException {
+ }
+
+ @Override
+ public void before(ITupleReference tuple) throws HyracksDataException {
+ int pkHash = computePrimaryKeyHashValue(tuple, primaryKeyFields);
+ try {
+ lockManager.lock(datasetId, pkHash, LockMode.X, txnCtx);
+ } catch (ACIDException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/5dc958ce/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/LockThenSearchOperationCallbackFactory.java
----------------------------------------------------------------------
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/LockThenSearchOperationCallbackFactory.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/LockThenSearchOperationCallbackFactory.java
new file mode 100644
index 0000000..6bfb6cd
--- /dev/null
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/LockThenSearchOperationCallbackFactory.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.transaction.management.opcallbacks;
+
+import org.apache.asterix.common.context.ITransactionSubsystemProvider;
+import org.apache.asterix.common.exceptions.ACIDException;
+import org.apache.asterix.common.transactions.AbstractOperationCallbackFactory;
+import org.apache.asterix.common.transactions.ITransactionContext;
+import org.apache.asterix.common.transactions.ITransactionSubsystem;
+import org.apache.asterix.common.transactions.JobId;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.common.api.ISearchOperationCallback;
+import org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory;
+
+public class LockThenSearchOperationCallbackFactory extends AbstractOperationCallbackFactory
+ implements ISearchOperationCallbackFactory {
+
+ private static final long serialVersionUID = 1L;
+
+ public LockThenSearchOperationCallbackFactory(JobId jobId, int datasetId, int[] entityIdFields,
+ ITransactionSubsystemProvider txnSubsystemProvider, byte resourceType) {
+ super(jobId, datasetId, entityIdFields, txnSubsystemProvider, resourceType);
+ }
+
+ @Override
+ public ISearchOperationCallback createSearchOperationCallback(long resourceId, IHyracksTaskContext ctx)
+ throws HyracksDataException {
+ ITransactionSubsystem txnSubsystem = txnSubsystemProvider.getTransactionSubsystem(ctx);
+ try {
+ ITransactionContext txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(jobId, false);
+ return new LockThenSearchOperationCallback(datasetId, primaryKeyFields, txnSubsystem.getLockManager(),
+ txnCtx);
+ } catch (ACIDException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/5dc958ce/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexInstantSearchOperationCallback.java
----------------------------------------------------------------------
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexInstantSearchOperationCallback.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexInstantSearchOperationCallback.java
index 944f07e..b2477cd 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexInstantSearchOperationCallback.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexInstantSearchOperationCallback.java
@@ -31,8 +31,8 @@ import org.apache.hyracks.storage.am.common.api.ISearchOperationCallback;
/**
* Assumes LSM-BTrees as primary indexes. Implements try/locking and unlocking on primary keys.
*/
-public class PrimaryIndexInstantSearchOperationCallback extends AbstractOperationCallback implements
- ISearchOperationCallback {
+public class PrimaryIndexInstantSearchOperationCallback extends AbstractOperationCallback
+ implements ISearchOperationCallback {
public PrimaryIndexInstantSearchOperationCallback(int datasetId, int[] entityIdFields, ILockManager lockManager,
ITransactionContext txnCtx) {
@@ -73,4 +73,8 @@ public class PrimaryIndexInstantSearchOperationCallback extends AbstractOperatio
throw new HyracksDataException(e);
}
}
+
+ @Override
+ public void before(ITupleReference tuple) throws HyracksDataException {
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/5dc958ce/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallback.java
----------------------------------------------------------------------
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallback.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallback.java
index 5d6349d..3c34153 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallback.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallback.java
@@ -33,8 +33,8 @@ import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
* Assumes LSM-BTrees as primary indexes.
* Performs locking on primary keys, and also logs before/after images.
*/
-public class PrimaryIndexModificationOperationCallback extends AbstractIndexModificationOperationCallback implements
- IModificationOperationCallback {
+public class PrimaryIndexModificationOperationCallback extends AbstractIndexModificationOperationCallback
+ implements IModificationOperationCallback {
public PrimaryIndexModificationOperationCallback(int datasetId, int[] primaryKeyFields, ITransactionContext txnCtx,
ILockManager lockManager, ITransactionSubsystem txnSubsystem, long resourceId, byte resourceType,
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/5dc958ce/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexSearchOperationCallback.java
----------------------------------------------------------------------
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexSearchOperationCallback.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexSearchOperationCallback.java
index 2d22879..9532f9e 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexSearchOperationCallback.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexSearchOperationCallback.java
@@ -39,6 +39,11 @@ public class PrimaryIndexSearchOperationCallback extends AbstractOperationCallba
}
@Override
+ public void before(ITupleReference tuple) throws HyracksDataException {
+ //no op
+ }
+
+ @Override
public boolean proceed(ITupleReference tuple) throws HyracksDataException {
try {
return lockManager.tryLock(datasetId, -1, LockMode.S, txnCtx);
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/5dc958ce/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallback.java
----------------------------------------------------------------------
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallback.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallback.java
index cd0c41f..250e28d 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallback.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallback.java
@@ -33,8 +33,8 @@ import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
* We assume that the modification of the corresponding primary index has already taken an appropriate lock.
* This callback performs logging of the before and/or after images for secondary indexes.
*/
-public class SecondaryIndexModificationOperationCallback extends AbstractIndexModificationOperationCallback implements
- IModificationOperationCallback {
+public class SecondaryIndexModificationOperationCallback extends AbstractIndexModificationOperationCallback
+ implements IModificationOperationCallback {
protected final IndexOperation oldOp;
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/5dc958ce/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexSearchOperationCallback.java
----------------------------------------------------------------------
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexSearchOperationCallback.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexSearchOperationCallback.java
index 59924d3..ac5f4d4 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexSearchOperationCallback.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexSearchOperationCallback.java
@@ -27,14 +27,19 @@ import org.apache.hyracks.storage.am.common.api.ISearchOperationCallback;
/**
* Secondary index searches perform no locking at all.
*/
-public class SecondaryIndexSearchOperationCallback extends AbstractOperationCallback implements
- ISearchOperationCallback {
+public class SecondaryIndexSearchOperationCallback extends AbstractOperationCallback
+ implements ISearchOperationCallback {
public SecondaryIndexSearchOperationCallback() {
super(-1, null, null, null);
}
@Override
+ public void before(ITupleReference tuple) throws HyracksDataException {
+ // Do nothing
+ }
+
+ @Override
public boolean proceed(ITupleReference tuple) throws HyracksDataException {
return true;
}
@@ -53,5 +58,4 @@ public class SecondaryIndexSearchOperationCallback extends AbstractOperationCall
public void complete(ITupleReference tuple) throws HyracksDataException {
// Do nothing.
}
-
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/5dc958ce/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetIndexModificationOperationCallback.java
----------------------------------------------------------------------
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetIndexModificationOperationCallback.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetIndexModificationOperationCallback.java
index b27daea..69aad24 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetIndexModificationOperationCallback.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetIndexModificationOperationCallback.java
@@ -34,8 +34,8 @@ import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
* The "before" and "found" method in this callback is empty so that no locking is requested for accessing a temporary
* dataset and no write-ahead log is written for update operations.
*/
-public class TempDatasetIndexModificationOperationCallback extends AbstractIndexModificationOperationCallback implements
- IModificationOperationCallback {
+public class TempDatasetIndexModificationOperationCallback extends AbstractIndexModificationOperationCallback
+ implements IModificationOperationCallback {
public TempDatasetIndexModificationOperationCallback(int datasetId, int[] primaryKeyFields,
ITransactionContext txnCtx, ILockManager lockManager, ITransactionSubsystem txnSubsystem, long resourceId,