You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by bu...@apache.org on 2016/02/13 03:15:34 UTC
[05/24] incubator-asterixdb git commit: Move to non-copy-based
evaluator interfaces for all function implementations,
including: - scalar functions, - aggregate functions,
- running aggregate functions, - unnesting functions
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/0ae30836/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/records/FieldAccessByIndexEvalFactory.java
----------------------------------------------------------------------
diff --git a/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/records/FieldAccessByIndexEvalFactory.java b/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/records/FieldAccessByIndexEvalFactory.java
index 7bc6aef..dd70d1e 100644
--- a/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/records/FieldAccessByIndexEvalFactory.java
+++ b/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/records/FieldAccessByIndexEvalFactory.java
@@ -34,25 +34,27 @@ import org.apache.asterix.om.types.IAType;
import org.apache.asterix.om.util.NonTaggedFormatUtil;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.common.exceptions.NotImplementedException;
-import org.apache.hyracks.algebricks.runtime.base.ICopyEvaluator;
-import org.apache.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
-import org.apache.hyracks.data.std.api.IDataOutputProvider;
+import org.apache.hyracks.data.std.api.IPointable;
import org.apache.hyracks.data.std.primitive.IntegerPointable;
+import org.apache.hyracks.data.std.primitive.VoidPointable;
import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
-public class FieldAccessByIndexEvalFactory implements ICopyEvaluatorFactory {
+public class FieldAccessByIndexEvalFactory implements IScalarEvaluatorFactory {
private static final long serialVersionUID = 1L;
- private ICopyEvaluatorFactory recordEvalFactory;
- private ICopyEvaluatorFactory fieldIndexEvalFactory;
+ private IScalarEvaluatorFactory recordEvalFactory;
+ private IScalarEvaluatorFactory fieldIndexEvalFactory;
private int nullBitmapSize;
private ARecordType recordType;
- public FieldAccessByIndexEvalFactory(ICopyEvaluatorFactory recordEvalFactory,
- ICopyEvaluatorFactory fieldIndexEvalFactory, ARecordType recordType) {
+ public FieldAccessByIndexEvalFactory(IScalarEvaluatorFactory recordEvalFactory,
+ IScalarEvaluatorFactory fieldIndexEvalFactory, ARecordType recordType) {
this.recordEvalFactory = recordEvalFactory;
this.fieldIndexEvalFactory = fieldIndexEvalFactory;
this.recordType = recordType;
@@ -60,16 +62,15 @@ public class FieldAccessByIndexEvalFactory implements ICopyEvaluatorFactory {
}
@Override
- public ICopyEvaluator createEvaluator(final IDataOutputProvider output) throws AlgebricksException {
-
- return new ICopyEvaluator() {
-
- private DataOutput out = output.getDataOutput();
-
- private ArrayBackedValueStorage outInput0 = new ArrayBackedValueStorage();
- private ArrayBackedValueStorage outInput1 = new ArrayBackedValueStorage();
- private ICopyEvaluator eval0 = recordEvalFactory.createEvaluator(outInput0);
- private ICopyEvaluator eval1 = fieldIndexEvalFactory.createEvaluator(outInput1);
+ public IScalarEvaluator createScalarEvaluator(final IHyracksTaskContext ctx) throws AlgebricksException {
+ return new IScalarEvaluator() {
+ private ArrayBackedValueStorage resultStorage = new ArrayBackedValueStorage();
+ private DataOutput out = resultStorage.getDataOutput();
+
+ private IPointable inputArg0 = new VoidPointable();
+ private IPointable inputArg1 = new VoidPointable();
+ private IScalarEvaluator eval0 = recordEvalFactory.createScalarEvaluator(ctx);
+ private IScalarEvaluator eval1 = fieldIndexEvalFactory.createScalarEvaluator(ctx);
@SuppressWarnings("unchecked")
private ISerializerDeserializer<ANull> nullSerde = AqlSerializerDeserializerProvider.INSTANCE
.getSerializerDeserializer(BuiltinType.ANULL);
@@ -80,37 +81,39 @@ public class FieldAccessByIndexEvalFactory implements ICopyEvaluatorFactory {
private ATypeTag fieldValueTypeTag = ATypeTag.NULL;
/*
- * outInput0: the record
- * outInput1: the index
+ * inputArg0: the record
+ * inputArg1: the index
*
- * This method outputs into IDataOutputProvider output [field type tag (1 byte)][the field data]
+ * This method outputs into IHyracksTaskContext context [field type tag (1 byte)][the field data]
*/
@Override
- public void evaluate(IFrameTupleReference tuple) throws AlgebricksException {
+ public void evaluate(IFrameTupleReference tuple, IPointable result) throws AlgebricksException {
try {
- outInput0.reset();
- eval0.evaluate(tuple);
- byte[] serRecord = outInput0.getByteArray();
+ resultStorage.reset();
+ eval0.evaluate(tuple, inputArg0);
+ byte[] serRecord = inputArg0.getByteArray();
+ int offset = inputArg0.getStartOffset();
- if (serRecord[0] == ATypeTag.SERIALIZED_NULL_TYPE_TAG) {
+ if (serRecord[offset] == ATypeTag.SERIALIZED_NULL_TYPE_TAG) {
nullSerde.serialize(ANull.NULL, out);
+ result.set(resultStorage);
return;
}
- if (serRecord[0] != ATypeTag.SERIALIZED_RECORD_TYPE_TAG) {
+ if (serRecord[offset] != ATypeTag.SERIALIZED_RECORD_TYPE_TAG) {
throw new AlgebricksException("Field accessor is not defined for values of type "
- + EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(serRecord[0]));
+ + EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(serRecord[offset]));
}
- outInput1.reset();
- eval1.evaluate(tuple);
- fieldIndex = IntegerPointable.getInteger(outInput1.getByteArray(), 1);
+ eval1.evaluate(tuple, inputArg1);
+ fieldIndex = IntegerPointable.getInteger(inputArg1.getByteArray(), inputArg1.getStartOffset() + 1);
fieldValueType = recordType.getFieldTypes()[fieldIndex];
- fieldValueOffset = ARecordSerializerDeserializer.getFieldOffsetById(serRecord, fieldIndex,
+ fieldValueOffset = ARecordSerializerDeserializer.getFieldOffsetById(serRecord, offset, fieldIndex,
nullBitmapSize, recordType.isOpen());
if (fieldValueOffset == 0) {
// the field is null, we checked the null bit map
out.writeByte(ATypeTag.SERIALIZED_NULL_TYPE_TAG);
+ result.set(resultStorage);
return;
}
@@ -131,7 +134,7 @@ public class FieldAccessByIndexEvalFactory implements ICopyEvaluatorFactory {
out.writeByte(fieldValueTypeTag.serialize());
}
out.write(serRecord, fieldValueOffset, fieldValueLength);
-
+ result.set(resultStorage);
} catch (IOException e) {
throw new AlgebricksException(e);
} catch (AsterixException e) {
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/0ae30836/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/records/FieldAccessByNameDescriptor.java
----------------------------------------------------------------------
diff --git a/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/records/FieldAccessByNameDescriptor.java b/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/records/FieldAccessByNameDescriptor.java
index 0951de0..f11941e 100644
--- a/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/records/FieldAccessByNameDescriptor.java
+++ b/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/records/FieldAccessByNameDescriptor.java
@@ -23,7 +23,7 @@ import org.apache.asterix.om.functions.IFunctionDescriptor;
import org.apache.asterix.om.functions.IFunctionDescriptorFactory;
import org.apache.asterix.runtime.evaluators.base.AbstractScalarFunctionDynamicDescriptor;
import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
-import org.apache.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
public class FieldAccessByNameDescriptor extends AbstractScalarFunctionDynamicDescriptor {
@@ -40,7 +40,7 @@ public class FieldAccessByNameDescriptor extends AbstractScalarFunctionDynamicDe
}
@Override
- public ICopyEvaluatorFactory createEvaluatorFactory(ICopyEvaluatorFactory[] args) {
+ public IScalarEvaluatorFactory createEvaluatorFactory(IScalarEvaluatorFactory[] args) {
return new FieldAccessByNameEvalFactory(args[0], args[1]);
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/0ae30836/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/records/FieldAccessByNameEvalFactory.java
----------------------------------------------------------------------
diff --git a/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/records/FieldAccessByNameEvalFactory.java b/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/records/FieldAccessByNameEvalFactory.java
index 66bedf0..d11c91e 100644
--- a/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/records/FieldAccessByNameEvalFactory.java
+++ b/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/records/FieldAccessByNameEvalFactory.java
@@ -31,36 +31,39 @@ import org.apache.asterix.om.types.BuiltinType;
import org.apache.asterix.om.types.EnumDeserializer;
import org.apache.asterix.om.util.NonTaggedFormatUtil;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.algebricks.runtime.base.ICopyEvaluator;
-import org.apache.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
-import org.apache.hyracks.data.std.api.IDataOutputProvider;
+import org.apache.hyracks.data.std.api.IPointable;
+import org.apache.hyracks.data.std.primitive.VoidPointable;
import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
-public class FieldAccessByNameEvalFactory implements ICopyEvaluatorFactory {
+public class FieldAccessByNameEvalFactory implements IScalarEvaluatorFactory {
private static final long serialVersionUID = 1L;
- private ICopyEvaluatorFactory recordEvalFactory;
- private ICopyEvaluatorFactory fldNameEvalFactory;
+ private IScalarEvaluatorFactory recordEvalFactory;
+ private IScalarEvaluatorFactory fldNameEvalFactory;
- public FieldAccessByNameEvalFactory(ICopyEvaluatorFactory recordEvalFactory,
- ICopyEvaluatorFactory fldNameEvalFactory) {
+ public FieldAccessByNameEvalFactory(IScalarEvaluatorFactory recordEvalFactory,
+ IScalarEvaluatorFactory fldNameEvalFactory) {
this.recordEvalFactory = recordEvalFactory;
this.fldNameEvalFactory = fldNameEvalFactory;
}
@Override
- public ICopyEvaluator createEvaluator(final IDataOutputProvider output) throws AlgebricksException {
- return new ICopyEvaluator() {
+ public IScalarEvaluator createScalarEvaluator(final IHyracksTaskContext ctx) throws AlgebricksException {
+ return new IScalarEvaluator() {
- private DataOutput out = output.getDataOutput();
+ private ArrayBackedValueStorage resultStorage = new ArrayBackedValueStorage();
+ private DataOutput out = resultStorage.getDataOutput();
- private ArrayBackedValueStorage outInput0 = new ArrayBackedValueStorage();
- private ArrayBackedValueStorage outInput1 = new ArrayBackedValueStorage();
- private ICopyEvaluator eval0 = recordEvalFactory.createEvaluator(outInput0);
- private ICopyEvaluator eval1 = fldNameEvalFactory.createEvaluator(outInput1);
+ private IPointable inputArg0 = new VoidPointable();
+ private IPointable inputArg1 = new VoidPointable();
+ private IScalarEvaluator eval0 = recordEvalFactory.createScalarEvaluator(ctx);
+ private IScalarEvaluator eval1 = fldNameEvalFactory.createScalarEvaluator(ctx);
@SuppressWarnings("unchecked")
private ISerializerDeserializer<ANull> nullSerde = AqlSerializerDeserializerProvider.INSTANCE
.getSerializerDeserializer(BuiltinType.ANULL);
@@ -69,38 +72,41 @@ public class FieldAccessByNameEvalFactory implements ICopyEvaluatorFactory {
private ATypeTag fieldValueTypeTag = ATypeTag.NULL;
@Override
- public void evaluate(IFrameTupleReference tuple) throws AlgebricksException {
-
+ public void evaluate(IFrameTupleReference tuple, IPointable result) throws AlgebricksException {
try {
- outInput0.reset();
- eval0.evaluate(tuple);
- outInput1.reset();
- eval1.evaluate(tuple);
- byte[] serRecord = outInput0.getByteArray();
-
- if (serRecord[0] == ATypeTag.SERIALIZED_NULL_TYPE_TAG) {
+ resultStorage.reset();
+ eval0.evaluate(tuple, inputArg0);
+ eval1.evaluate(tuple, inputArg1);
+ byte[] serRecord = inputArg0.getByteArray();
+ int serRecordOffset = inputArg0.getStartOffset();
+ int serRecordLen = inputArg0.getLength();
+
+ if (serRecord[serRecordOffset] == ATypeTag.SERIALIZED_NULL_TYPE_TAG) {
nullSerde.serialize(ANull.NULL, out);
+ result.set(resultStorage);
return;
}
- if (serRecord[0] != ATypeTag.SERIALIZED_RECORD_TYPE_TAG) {
+ if (serRecord[serRecordOffset] != ATypeTag.SERIALIZED_RECORD_TYPE_TAG) {
throw new AlgebricksException(AsterixBuiltinFunctions.FIELD_ACCESS_BY_NAME.getName()
+ ": expects input type NULL or RECORD, but got "
- + EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(serRecord[0]));
+ + EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(serRecord[serRecordOffset]));
}
- byte[] serFldName = outInput1.getByteArray();
- fieldValueOffset = ARecordSerializerDeserializer.getFieldOffsetByName(serRecord, serFldName);
+ byte[] serFldName = inputArg1.getByteArray();
+ int serFldNameOffset = inputArg1.getStartOffset();
+ fieldValueOffset = ARecordSerializerDeserializer.getFieldOffsetByName(serRecord, serRecordOffset,
+ serRecordLen, serFldName, serFldNameOffset);
if (fieldValueOffset < 0) {
out.writeByte(ATypeTag.SERIALIZED_NULL_TYPE_TAG);
+ result.set(resultStorage);
return;
}
fieldValueTypeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(serRecord[fieldValueOffset]);
fieldValueLength = NonTaggedFormatUtil.getFieldValueLength(serRecord, fieldValueOffset,
fieldValueTypeTag, true) + 1;
- out.write(serRecord, fieldValueOffset, fieldValueLength);
-
+ result.set(serRecord, fieldValueOffset, fieldValueLength);
} catch (IOException e) {
throw new AlgebricksException(e);
} catch (AsterixException e) {
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/0ae30836/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/records/FieldAccessNestedDescriptor.java
----------------------------------------------------------------------
diff --git a/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/records/FieldAccessNestedDescriptor.java b/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/records/FieldAccessNestedDescriptor.java
index 726a8b3..67e7027 100644
--- a/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/records/FieldAccessNestedDescriptor.java
+++ b/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/records/FieldAccessNestedDescriptor.java
@@ -26,7 +26,7 @@ import org.apache.asterix.om.functions.IFunctionDescriptorFactory;
import org.apache.asterix.om.types.ARecordType;
import org.apache.asterix.runtime.evaluators.base.AbstractScalarFunctionDynamicDescriptor;
import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
-import org.apache.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
public class FieldAccessNestedDescriptor extends AbstractScalarFunctionDynamicDescriptor {
@@ -51,7 +51,7 @@ public class FieldAccessNestedDescriptor extends AbstractScalarFunctionDynamicDe
}
@Override
- public ICopyEvaluatorFactory createEvaluatorFactory(ICopyEvaluatorFactory[] args) {
+ public IScalarEvaluatorFactory createEvaluatorFactory(IScalarEvaluatorFactory[] args) {
return new FieldAccessNestedEvalFactory(args[0], recType, fldName);
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/0ae30836/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/records/FieldAccessNestedEvalFactory.java
----------------------------------------------------------------------
diff --git a/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/records/FieldAccessNestedEvalFactory.java b/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/records/FieldAccessNestedEvalFactory.java
index 2f41816..762fdf3 100644
--- a/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/records/FieldAccessNestedEvalFactory.java
+++ b/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/records/FieldAccessNestedEvalFactory.java
@@ -19,27 +19,44 @@
package org.apache.asterix.runtime.evaluators.functions.records;
import java.io.DataOutput;
+import java.io.IOException;
import java.util.List;
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.dataflow.data.nontagged.serde.ARecordSerializerDeserializer;
+import org.apache.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
+import org.apache.asterix.om.base.ANull;
+import org.apache.asterix.om.base.AString;
import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.asterix.om.types.AUnionType;
+import org.apache.asterix.om.types.BuiltinType;
+import org.apache.asterix.om.types.EnumDeserializer;
+import org.apache.asterix.om.types.IAType;
import org.apache.asterix.om.types.runtime.RuntimeRecordTypeInfo;
+import org.apache.asterix.om.util.NonTaggedFormatUtil;
+import org.apache.commons.lang.NotImplementedException;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.algebricks.runtime.base.ICopyEvaluator;
-import org.apache.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
-import org.apache.hyracks.data.std.api.IDataOutputProvider;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IPointable;
+import org.apache.hyracks.data.std.primitive.VoidPointable;
import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
import org.apache.hyracks.data.std.util.ByteArrayAccessibleOutputStream;
import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
-public class FieldAccessNestedEvalFactory implements ICopyEvaluatorFactory {
+public class FieldAccessNestedEvalFactory implements IScalarEvaluatorFactory {
private static final long serialVersionUID = 1L;
- private ICopyEvaluatorFactory recordEvalFactory;
+ private IScalarEvaluatorFactory recordEvalFactory;
private ARecordType recordType;
private List<String> fieldPath;
- public FieldAccessNestedEvalFactory(ICopyEvaluatorFactory recordEvalFactory, ARecordType recordType,
+ public FieldAccessNestedEvalFactory(IScalarEvaluatorFactory recordEvalFactory, ARecordType recordType,
List<String> fldName) {
this.recordEvalFactory = recordEvalFactory;
this.recordType = recordType;
@@ -48,30 +65,192 @@ public class FieldAccessNestedEvalFactory implements ICopyEvaluatorFactory {
}
@Override
- public ICopyEvaluator createEvaluator(final IDataOutputProvider output) throws AlgebricksException {
- return new ICopyEvaluator() {
+ public IScalarEvaluator createScalarEvaluator(final IHyracksTaskContext ctx) throws AlgebricksException {
+ return new IScalarEvaluator() {
- private final DataOutput out = output.getDataOutput();
+ private ArrayBackedValueStorage resultStorage = new ArrayBackedValueStorage();
+ private final DataOutput out = resultStorage.getDataOutput();
private final ByteArrayAccessibleOutputStream subRecordTmpStream = new ByteArrayAccessibleOutputStream();
- private final ArrayBackedValueStorage outInput0 = new ArrayBackedValueStorage();
- private final ICopyEvaluator eval0 = recordEvalFactory.createEvaluator(outInput0);
- private final ArrayBackedValueStorage[] abvsFields = new ArrayBackedValueStorage[fieldPath.size()];
- private final DataOutput[] doFields = new DataOutput[fieldPath.size()];
+ private final IPointable inputArg0 = new VoidPointable();
+ private final IScalarEvaluator eval0 = recordEvalFactory.createScalarEvaluator(ctx);
+ private final IPointable[] fieldPointables = new VoidPointable[fieldPath.size()];
private final RuntimeRecordTypeInfo[] recTypeInfos = new RuntimeRecordTypeInfo[fieldPath.size()];
+ @SuppressWarnings("unchecked")
+ private final ISerializerDeserializer<ANull> nullSerde = AqlSerializerDeserializerProvider.INSTANCE
+ .getSerializerDeserializer(BuiltinType.ANULL);
{
- FieldAccessUtil.getFieldsAbvs(abvsFields, doFields, fieldPath);
+ generateFieldsPointables();
for (int index = 0; index < fieldPath.size(); ++index) {
recTypeInfos[index] = new RuntimeRecordTypeInfo();
}
}
+ @SuppressWarnings("unchecked")
+ private void generateFieldsPointables() throws AlgebricksException {
+ for (int i = 0; i < fieldPath.size(); i++) {
+ ArrayBackedValueStorage storage = new ArrayBackedValueStorage();
+ DataOutput out = storage.getDataOutput();
+ AString as = new AString(fieldPath.get(i));
+ try {
+ AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(as.getType()).serialize(as,
+ out);
+ } catch (HyracksDataException e) {
+ throw new AlgebricksException(e);
+ }
+ fieldPointables[i] = new VoidPointable();
+ fieldPointables[i].set(storage);
+ }
+ }
+
@Override
- public void evaluate(IFrameTupleReference tuple) throws AlgebricksException {
- FieldAccessUtil.evaluate(tuple, out, eval0, abvsFields, outInput0, subRecordTmpStream, recordType,
- recTypeInfos);
+ public void evaluate(IFrameTupleReference tuple, IPointable result) throws AlgebricksException {
+ try {
+ resultStorage.reset();
+ eval0.evaluate(tuple, inputArg0);
+ byte[] serRecord = inputArg0.getByteArray();
+ int offset = inputArg0.getStartOffset();
+ int start = offset;
+ int len = inputArg0.getLength();
+
+ if (serRecord[start] == ATypeTag.SERIALIZED_NULL_TYPE_TAG) {
+ nullSerde.serialize(ANull.NULL, out);
+ result.set(resultStorage);
+ return;
+ }
+ if (serRecord[start] != ATypeTag.SERIALIZED_RECORD_TYPE_TAG) {
+ throw new AlgebricksException("Field accessor is not defined for values of type "
+ + EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(serRecord[start]));
+ }
+
+ int subFieldIndex = -1;
+ int subFieldOffset = -1;
+ int subFieldLength = -1;
+ int nullBitmapSize = -1;
+
+ IAType subType = recordType;
+ recTypeInfos[0].reset(recordType);
+
+ ATypeTag subTypeTag = ATypeTag.NULL;
+ boolean openField = false;
+ int pathIndex = 0;
+
+ // Moving through closed fields first.
+ for (; pathIndex < fieldPointables.length; pathIndex++) {
+ if (subType.getTypeTag().equals(ATypeTag.UNION)) {
+ //enforced SubType
+ subType = ((AUnionType) subType).getNullableType();
+ if (subType.getTypeTag().serialize() != ATypeTag.SERIALIZED_RECORD_TYPE_TAG) {
+ throw new AlgebricksException(
+ "Field accessor is not defined for values of type " + subTypeTag);
+ }
+ if (subType.getTypeTag() == ATypeTag.RECORD) {
+ recTypeInfos[pathIndex].reset((ARecordType) subType);
+ }
+ }
+ subFieldIndex = recTypeInfos[pathIndex].getFieldIndex(fieldPointables[pathIndex].getByteArray(),
+ fieldPointables[pathIndex].getStartOffset() + 1,
+ fieldPointables[pathIndex].getLength() - 1);
+ if (subFieldIndex == -1) {
+ break;
+ }
+ nullBitmapSize = ARecordType.computeNullBitmapSize((ARecordType) subType);
+ subFieldOffset = ARecordSerializerDeserializer.getFieldOffsetById(serRecord, start,
+ subFieldIndex, nullBitmapSize, ((ARecordType) subType).isOpen());
+ if (subFieldOffset == 0) {
+ // the field is null, we checked the null bit map
+ out.writeByte(ATypeTag.SERIALIZED_NULL_TYPE_TAG);
+ result.set(resultStorage);
+ return;
+ }
+ subType = ((ARecordType) subType).getFieldTypes()[subFieldIndex];
+ if (subType.getTypeTag() == ATypeTag.RECORD && pathIndex + 1 < fieldPointables.length) {
+ // Move to the next Depth
+ recTypeInfos[pathIndex + 1].reset((ARecordType) subType);
+ }
+ if (subType.getTypeTag().equals(ATypeTag.UNION)) {
+ if (((AUnionType) subType).isNullableType()) {
+ subTypeTag = ((AUnionType) subType).getNullableType().getTypeTag();
+ subFieldLength = NonTaggedFormatUtil.getFieldValueLength(serRecord, subFieldOffset,
+ subTypeTag, false);
+ } else {
+ // union .. the general case
+ throw new NotImplementedException();
+ }
+ } else {
+ subTypeTag = subType.getTypeTag();
+ subFieldLength = NonTaggedFormatUtil.getFieldValueLength(serRecord, subFieldOffset,
+ subTypeTag, false);
+ }
+
+ if (pathIndex < fieldPointables.length - 1) {
+ //setup next iteration
+ subRecordTmpStream.reset();
+ subRecordTmpStream.write(subTypeTag.serialize());
+ subRecordTmpStream.write(serRecord, subFieldOffset, subFieldLength);
+ serRecord = subRecordTmpStream.getByteArray();
+ start = 0;
+
+ // type check
+ if (serRecord[start] == ATypeTag.SERIALIZED_NULL_TYPE_TAG) {
+ nullSerde.serialize(ANull.NULL, out);
+ result.set(resultStorage);
+ return;
+ }
+ if (serRecord[start] != ATypeTag.SERIALIZED_RECORD_TYPE_TAG) {
+ throw new AlgebricksException("Field accessor is not defined for values of type "
+ + EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(serRecord[start]));
+ }
+ }
+ }
+
+ // Moving through open fields after we hit the first open field.
+ for (; pathIndex < fieldPointables.length; pathIndex++) {
+ openField = true;
+ subFieldOffset = ARecordSerializerDeserializer.getFieldOffsetByName(serRecord, start, len,
+ fieldPointables[pathIndex].getByteArray(), fieldPointables[pathIndex].getStartOffset());
+ if (subFieldOffset < 0) {
+ out.writeByte(ATypeTag.SERIALIZED_NULL_TYPE_TAG);
+ result.set(resultStorage);
+ return;
+ }
+
+ subTypeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(serRecord[subFieldOffset]);
+ subFieldLength = NonTaggedFormatUtil.getFieldValueLength(serRecord, subFieldOffset, subTypeTag,
+ true) + 1;
+
+ if (pathIndex < fieldPointables.length - 1) {
+ //setup next iteration
+ start = subFieldOffset;
+ len = subFieldLength;
+
+ // type check
+ if (serRecord[start] == ATypeTag.SERIALIZED_NULL_TYPE_TAG) {
+ nullSerde.serialize(ANull.NULL, out);
+ result.set(resultStorage);
+ return;
+ }
+ if (serRecord[start] != ATypeTag.SERIALIZED_RECORD_TYPE_TAG) {
+ throw new AlgebricksException("Field accessor is not defined for values of type "
+ + EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(serRecord[start]));
+ }
+ }
+ }
+ // emit the final result.
+ if (openField) {
+ result.set(serRecord, subFieldOffset, subFieldLength);
+ } else {
+ out.writeByte(subTypeTag.serialize());
+ out.write(serRecord, subFieldOffset, subFieldLength);
+ result.set(resultStorage);
+ }
+ } catch (IOException e) {
+ throw new AlgebricksException(e);
+ } catch (AsterixException e) {
+ throw new AlgebricksException(e);
+ }
}
};
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/0ae30836/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/records/FieldAccessUtil.java
----------------------------------------------------------------------
diff --git a/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/records/FieldAccessUtil.java b/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/records/FieldAccessUtil.java
deleted file mode 100644
index f9bb6fc..0000000
--- a/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/records/FieldAccessUtil.java
+++ /dev/null
@@ -1,203 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.runtime.evaluators.functions.records;
-
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.dataflow.data.nontagged.serde.ARecordSerializerDeserializer;
-import org.apache.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
-import org.apache.asterix.om.base.ANull;
-import org.apache.asterix.om.base.AString;
-import org.apache.asterix.om.types.ARecordType;
-import org.apache.asterix.om.types.ATypeTag;
-import org.apache.asterix.om.types.AUnionType;
-import org.apache.asterix.om.types.BuiltinType;
-import org.apache.asterix.om.types.EnumDeserializer;
-import org.apache.asterix.om.types.IAType;
-import org.apache.asterix.om.types.runtime.RuntimeRecordTypeInfo;
-import org.apache.asterix.om.util.NonTaggedFormatUtil;
-import org.apache.commons.lang.NotImplementedException;
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.algebricks.runtime.base.ICopyEvaluator;
-import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
-import org.apache.hyracks.data.std.util.ByteArrayAccessibleOutputStream;
-import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
-
-public class FieldAccessUtil {
- @SuppressWarnings("unchecked")
- private static ISerializerDeserializer<ANull> nullSerde = AqlSerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(BuiltinType.ANULL);
-
- @SuppressWarnings("unchecked")
- public static void getFieldsAbvs(ArrayBackedValueStorage[] abvsFields, DataOutput[] doFields,
- List<String> fieldPaths) throws AlgebricksException {
- AString as;
- for (int i = 0; i < fieldPaths.size(); i++) {
- abvsFields[i] = new ArrayBackedValueStorage();
- doFields[i] = abvsFields[i].getDataOutput();
- as = new AString(fieldPaths.get(i));
- try {
- AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(as.getType()).serialize(as,
- doFields[i]);
- } catch (HyracksDataException e) {
- throw new AlgebricksException(e);
- }
- }
- }
-
- public static boolean checkType(byte tagId, DataOutput out) throws AlgebricksException {
- if (tagId == ATypeTag.SERIALIZED_NULL_TYPE_TAG) {
- try {
- nullSerde.serialize(ANull.NULL, out);
- } catch (HyracksDataException e) {
- throw new AlgebricksException(e);
- }
- return true;
- }
-
- if (tagId != ATypeTag.SERIALIZED_RECORD_TYPE_TAG) {
- throw new AlgebricksException("Field accessor is not defined for values of type "
- + EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(tagId));
- }
- return false;
- }
-
- public static void evaluate(IFrameTupleReference tuple, DataOutput out, ICopyEvaluator eval0,
- ArrayBackedValueStorage[] abvsFields, ArrayBackedValueStorage abvsRecord,
- ByteArrayAccessibleOutputStream subRecordTmpStream, ARecordType recordType,
- RuntimeRecordTypeInfo[] recTypeInfos) throws AlgebricksException {
- try {
- abvsRecord.reset();
- eval0.evaluate(tuple);
-
- int subFieldIndex = -1;
- int subFieldOffset = -1;
- int subFieldLength = -1;
- int nullBitmapSize = -1;
-
- IAType subType = recordType;
- recTypeInfos[0].reset(recordType);
-
- ATypeTag subTypeTag = ATypeTag.NULL;
- byte[] subRecord = abvsRecord.getByteArray();
- boolean openField = false;
- int i = 0;
-
- if (checkType(subRecord[0], out)) {
- return;
- }
-
- //Moving through closed fields
- for (; i < abvsFields.length; i++) {
- if (subType.getTypeTag().equals(ATypeTag.UNION)) {
- //enforced SubType
- subType = ((AUnionType) subType).getNullableType();
- if (subType.getTypeTag().serialize() != ATypeTag.SERIALIZED_RECORD_TYPE_TAG) {
- throw new AlgebricksException("Field accessor is not defined for values of type " + subTypeTag);
- }
- if (subType.getTypeTag() == ATypeTag.RECORD) {
- recTypeInfos[i].reset((ARecordType) subType);
- }
- }
- subFieldIndex = recTypeInfos[i].getFieldIndex(abvsFields[i].getByteArray(),
- abvsFields[i].getStartOffset() + 1, abvsFields[i].getLength());
- if (subFieldIndex == -1) {
- break;
- }
- nullBitmapSize = ARecordType.computeNullBitmapSize((ARecordType) subType);
- subFieldOffset = ARecordSerializerDeserializer.getFieldOffsetById(subRecord, subFieldIndex,
- nullBitmapSize, ((ARecordType) subType).isOpen());
- if (subFieldOffset == 0) {
- // the field is null, we checked the null bit map
- out.writeByte(ATypeTag.SERIALIZED_NULL_TYPE_TAG);
- return;
- }
- subType = ((ARecordType) subType).getFieldTypes()[subFieldIndex];
- if (subType.getTypeTag() == ATypeTag.RECORD && i + 1 < abvsFields.length) {
- // Move to the next Depth
- recTypeInfos[i + 1].reset((ARecordType) subType);
- }
- if (subType.getTypeTag().equals(ATypeTag.UNION)) {
- if (((AUnionType) subType).isNullableType()) {
- subTypeTag = ((AUnionType) subType).getNullableType().getTypeTag();
- subFieldLength = NonTaggedFormatUtil.getFieldValueLength(subRecord, subFieldOffset, subTypeTag,
- false);
- } else {
- // union .. the general case
- throw new NotImplementedException();
- }
- } else {
- subTypeTag = subType.getTypeTag();
- subFieldLength = NonTaggedFormatUtil.getFieldValueLength(subRecord, subFieldOffset, subTypeTag,
- false);
- }
-
- if (i < abvsFields.length - 1) {
- //setup next iteration
- subRecordTmpStream.reset();
- subRecordTmpStream.write(subTypeTag.serialize());
- subRecordTmpStream.write(subRecord, subFieldOffset, subFieldLength);
- subRecord = subRecordTmpStream.getByteArray();
-
- if (checkType(subRecord[0], out)) {
- return;
- }
- }
- }
-
- //Moving through open fields
- for (; i < abvsFields.length; i++) {
- openField = true;
- subFieldOffset = ARecordSerializerDeserializer.getFieldOffsetByName(subRecord,
- abvsFields[i].getByteArray());
- if (subFieldOffset < 0) {
- out.writeByte(ATypeTag.SERIALIZED_NULL_TYPE_TAG);
- return;
- }
-
- subTypeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(subRecord[subFieldOffset]);
- subFieldLength = NonTaggedFormatUtil.getFieldValueLength(subRecord, subFieldOffset, subTypeTag, true)
- + 1;
-
- if (i < abvsFields.length - 1) {
- //setup next iteration
- subRecord = Arrays.copyOfRange(subRecord, subFieldOffset, subFieldOffset + subFieldLength);
-
- if (checkType(subRecord[0], out)) {
- return;
- }
- }
- }
- if (!openField) {
- out.writeByte(subTypeTag.serialize());
- }
- out.write(subRecord, subFieldOffset, subFieldLength);
- } catch (IOException e) {
- throw new AlgebricksException(e);
- } catch (AsterixException e) {
- throw new AlgebricksException(e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/0ae30836/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/records/GetRecordFieldValueDescriptor.java
----------------------------------------------------------------------
diff --git a/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/records/GetRecordFieldValueDescriptor.java b/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/records/GetRecordFieldValueDescriptor.java
index bc1eb4b..c916d62 100644
--- a/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/records/GetRecordFieldValueDescriptor.java
+++ b/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/records/GetRecordFieldValueDescriptor.java
@@ -24,7 +24,7 @@ import org.apache.asterix.om.functions.IFunctionDescriptorFactory;
import org.apache.asterix.om.types.ARecordType;
import org.apache.asterix.runtime.evaluators.base.AbstractScalarFunctionDynamicDescriptor;
import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
-import org.apache.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
public class GetRecordFieldValueDescriptor extends AbstractScalarFunctionDynamicDescriptor {
@@ -47,7 +47,7 @@ public class GetRecordFieldValueDescriptor extends AbstractScalarFunctionDynamic
}
@Override
- public ICopyEvaluatorFactory createEvaluatorFactory(ICopyEvaluatorFactory[] args) {
+ public IScalarEvaluatorFactory createEvaluatorFactory(IScalarEvaluatorFactory[] args) {
return new GetRecordFieldValueEvalFactory(args[0], args[1], recType);
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/0ae30836/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/records/GetRecordFieldValueEvalFactory.java
----------------------------------------------------------------------
diff --git a/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/records/GetRecordFieldValueEvalFactory.java b/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/records/GetRecordFieldValueEvalFactory.java
index 501c743..71ce3a9 100644
--- a/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/records/GetRecordFieldValueEvalFactory.java
+++ b/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/records/GetRecordFieldValueEvalFactory.java
@@ -21,83 +21,134 @@ package org.apache.asterix.runtime.evaluators.functions.records;
import java.io.DataOutput;
import java.io.IOException;
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.dataflow.data.nontagged.serde.ARecordSerializerDeserializer;
import org.apache.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
import org.apache.asterix.om.base.ANull;
import org.apache.asterix.om.types.ARecordType;
import org.apache.asterix.om.types.ATypeTag;
import org.apache.asterix.om.types.BuiltinType;
+import org.apache.asterix.om.types.EnumDeserializer;
import org.apache.asterix.om.types.runtime.RuntimeRecordTypeInfo;
+import org.apache.asterix.om.util.NonTaggedFormatUtil;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.algebricks.runtime.base.ICopyEvaluator;
-import org.apache.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
-import org.apache.hyracks.data.std.api.IDataOutputProvider;
+import org.apache.hyracks.data.std.api.IPointable;
+import org.apache.hyracks.data.std.primitive.VoidPointable;
import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
-import org.apache.hyracks.data.std.util.ByteArrayAccessibleOutputStream;
import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
-public class GetRecordFieldValueEvalFactory implements ICopyEvaluatorFactory {
+public class GetRecordFieldValueEvalFactory implements IScalarEvaluatorFactory {
private static final long serialVersionUID = 1L;
- private ICopyEvaluatorFactory recordEvalFactory;
- private ICopyEvaluatorFactory fldNameEvalFactory;
+ private IScalarEvaluatorFactory recordEvalFactory;
+ private IScalarEvaluatorFactory fldNameEvalFactory;
private final ARecordType recordType;
- public GetRecordFieldValueEvalFactory(ICopyEvaluatorFactory recordEvalFactory,
- ICopyEvaluatorFactory fldNameEvalFactory, ARecordType recordType) {
+ public GetRecordFieldValueEvalFactory(IScalarEvaluatorFactory recordEvalFactory,
+ IScalarEvaluatorFactory fldNameEvalFactory, ARecordType recordType) {
this.recordEvalFactory = recordEvalFactory;
this.fldNameEvalFactory = fldNameEvalFactory;
this.recordType = recordType;
}
@Override
- public ICopyEvaluator createEvaluator(final IDataOutputProvider output) throws AlgebricksException {
- return new ICopyEvaluator() {
+ public IScalarEvaluator createScalarEvaluator(final IHyracksTaskContext ctx) throws AlgebricksException {
+ return new IScalarEvaluator() {
- private final DataOutput out = output.getDataOutput();
- private final ByteArrayAccessibleOutputStream subRecordTmpStream = new ByteArrayAccessibleOutputStream();
+ private final ArrayBackedValueStorage resultStorage = new ArrayBackedValueStorage();
+ private final DataOutput out = resultStorage.getDataOutput();
- private final ArrayBackedValueStorage outInput0 = new ArrayBackedValueStorage();
- private final ArrayBackedValueStorage outInput1 = new ArrayBackedValueStorage();
- private final ICopyEvaluator eval0 = recordEvalFactory.createEvaluator(outInput0);
- private final ICopyEvaluator eval1 = fldNameEvalFactory.createEvaluator(outInput1);
-
- private final int size = 1;
- private final ArrayBackedValueStorage abvsFields[] = new ArrayBackedValueStorage[size];
- private final DataOutput[] doFields = new DataOutput[size];
+ private final IPointable inputArg0 = new VoidPointable();
+ private final IPointable inputArg1 = new VoidPointable();
+ private final IScalarEvaluator recordEval = recordEvalFactory.createScalarEvaluator(ctx);
+ private final IScalarEvaluator fieldNameEval = fldNameEvalFactory.createScalarEvaluator(ctx);
@SuppressWarnings("unchecked")
private final ISerializerDeserializer<ANull> nullSerde = AqlSerializerDeserializerProvider.INSTANCE
.getSerializerDeserializer(BuiltinType.ANULL);
- private final RuntimeRecordTypeInfo[] recTypeInfos = new RuntimeRecordTypeInfo[size];
+ private final RuntimeRecordTypeInfo recTypeInfo = new RuntimeRecordTypeInfo();
{
- abvsFields[0] = new ArrayBackedValueStorage();
- doFields[0] = abvsFields[0].getDataOutput();
- for (int index = 0; index < size; ++index) {
- recTypeInfos[index] = new RuntimeRecordTypeInfo();
- }
+ recTypeInfo.reset(recordType);
}
@Override
- public void evaluate(IFrameTupleReference tuple) throws AlgebricksException {
+ public void evaluate(IFrameTupleReference tuple, IPointable result) throws AlgebricksException {
try {
- outInput1.reset();
- eval1.evaluate(tuple);
+ resultStorage.reset();
+ fieldNameEval.evaluate(tuple, inputArg1);
+ byte[] serFldName = inputArg1.getByteArray();
+ int serFldNameOffset = inputArg1.getStartOffset();
+ int serFldNameLen = inputArg1.getLength();
+ if (serFldName[serFldNameOffset] != ATypeTag.SERIALIZED_STRING_TYPE_TAG) {
+ nullSerde.serialize(ANull.NULL, out);
+ result.set(resultStorage);
+ return;
+ }
- byte[] serFldName = outInput1.getByteArray();
- if (serFldName[0] != ATypeTag.SERIALIZED_STRING_TYPE_TAG) {
+ recordEval.evaluate(tuple, inputArg0);
+ byte[] serRecord = inputArg0.getByteArray();
+ int serRecordOffset = inputArg0.getStartOffset();
+ int serRecordLen = inputArg0.getLength();
+ if (serRecord[serRecordOffset] == ATypeTag.SERIALIZED_NULL_TYPE_TAG) {
nullSerde.serialize(ANull.NULL, out);
+ result.set(resultStorage);
return;
}
- abvsFields[0].reset();
- doFields[0].write(serFldName);
+ if (serRecord[serRecordOffset] != ATypeTag.SERIALIZED_RECORD_TYPE_TAG) {
+ throw new AlgebricksException("Field accessor is not defined for values of type "
+ + EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(serRecord[serRecordOffset]));
+ }
+
+ int subFieldOffset = -1;
+ int subFieldLength = -1;
- FieldAccessUtil.evaluate(tuple, out, eval0, abvsFields, outInput0, subRecordTmpStream, recordType,
- recTypeInfos);
+ // Look at closed fields first.
+ int subFieldIndex = recTypeInfo.getFieldIndex(serFldName, serFldNameOffset + 1, serFldNameLen - 1);
+ if (subFieldIndex >= 0) {
+ int nullBitmapSize = ARecordType.computeNullBitmapSize(recordType);
+ subFieldOffset = ARecordSerializerDeserializer.getFieldOffsetById(serRecord, serRecordOffset,
+ subFieldIndex, nullBitmapSize, recordType.isOpen());
+ if (subFieldOffset == 0) {
+ // the field is null, we checked the null bit map
+ out.writeByte(ATypeTag.SERIALIZED_NULL_TYPE_TAG);
+ result.set(resultStorage);
+ return;
+ }
+ ATypeTag fieldTypeTag = recordType.getFieldTypes()[subFieldIndex].getTypeTag();
+ subFieldLength = NonTaggedFormatUtil.getFieldValueLength(serRecord, subFieldOffset,
+ fieldTypeTag, false);
+ // write result.
+ out.writeByte(fieldTypeTag.serialize());
+ out.write(serRecord, subFieldOffset, subFieldLength);
+ result.set(resultStorage);
+ return;
+ }
+
+ // Look at open fields.
+ subFieldOffset = ARecordSerializerDeserializer.getFieldOffsetByName(serRecord, serRecordOffset,
+ serRecordLen, serFldName, serFldNameOffset);
+ if (subFieldOffset < 0) {
+ out.writeByte(ATypeTag.SERIALIZED_NULL_TYPE_TAG);
+ result.set(resultStorage);
+ return;
+ }
+ // Get the field length.
+ ATypeTag fieldValueTypeTag = EnumDeserializer.ATYPETAGDESERIALIZER
+ .deserialize(serRecord[subFieldOffset]);
+ subFieldLength = NonTaggedFormatUtil.getFieldValueLength(serRecord, subFieldOffset,
+ fieldValueTypeTag, true) + 1;
+ // write result.
+ result.set(serRecord, subFieldOffset, subFieldLength);
} catch (IOException e) {
throw new AlgebricksException(e);
+ } catch (AsterixException e) {
+ throw new AlgebricksException(e);
}
}
};
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/0ae30836/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/records/GetRecordFieldsDescriptor.java
----------------------------------------------------------------------
diff --git a/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/records/GetRecordFieldsDescriptor.java b/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/records/GetRecordFieldsDescriptor.java
index 76d19d3..e2b5ec1 100644
--- a/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/records/GetRecordFieldsDescriptor.java
+++ b/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/records/GetRecordFieldsDescriptor.java
@@ -24,7 +24,7 @@ import org.apache.asterix.om.functions.IFunctionDescriptorFactory;
import org.apache.asterix.om.types.ARecordType;
import org.apache.asterix.runtime.evaluators.base.AbstractScalarFunctionDynamicDescriptor;
import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
-import org.apache.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
public class GetRecordFieldsDescriptor extends AbstractScalarFunctionDynamicDescriptor {
@@ -47,7 +47,7 @@ public class GetRecordFieldsDescriptor extends AbstractScalarFunctionDynamicDesc
}
@Override
- public ICopyEvaluatorFactory createEvaluatorFactory(ICopyEvaluatorFactory[] args) {
+ public IScalarEvaluatorFactory createEvaluatorFactory(IScalarEvaluatorFactory[] args) {
return new GetRecordFieldsEvalFactory(args[0], recType);
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/0ae30836/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/records/GetRecordFieldsEvalFactory.java
----------------------------------------------------------------------
diff --git a/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/records/GetRecordFieldsEvalFactory.java b/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/records/GetRecordFieldsEvalFactory.java
index d3f7a79..a24906d 100644
--- a/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/records/GetRecordFieldsEvalFactory.java
+++ b/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/records/GetRecordFieldsEvalFactory.java
@@ -30,27 +30,31 @@ import org.apache.asterix.om.types.ATypeTag;
import org.apache.asterix.om.types.BuiltinType;
import org.apache.asterix.om.types.EnumDeserializer;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.algebricks.runtime.base.ICopyEvaluator;
-import org.apache.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.data.std.api.IDataOutputProvider;
+import org.apache.hyracks.data.std.api.IPointable;
+import org.apache.hyracks.data.std.primitive.VoidPointable;
import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
-public class GetRecordFieldsEvalFactory implements ICopyEvaluatorFactory {
+public class GetRecordFieldsEvalFactory implements IScalarEvaluatorFactory {
+
private static final long serialVersionUID = 1L;
- private ICopyEvaluatorFactory recordEvalFactory;
+
+ private IScalarEvaluatorFactory recordEvalFactory;
private final ARecordType recordType;
- public GetRecordFieldsEvalFactory(ICopyEvaluatorFactory recordEvalFactory, ARecordType recordType) {
+ public GetRecordFieldsEvalFactory(IScalarEvaluatorFactory recordEvalFactory, ARecordType recordType) {
this.recordEvalFactory = recordEvalFactory;
this.recordType = recordType;
}
@Override
- public ICopyEvaluator createEvaluator(final IDataOutputProvider output) throws AlgebricksException {
- return new ICopyEvaluator() {
+ public IScalarEvaluator createScalarEvaluator(final IHyracksTaskContext ctx) throws AlgebricksException {
+ return new IScalarEvaluator() {
@SuppressWarnings("unchecked")
private final ISerializerDeserializer<ANull> nullSerde = AqlSerializerDeserializerProvider.INSTANCE
@@ -58,32 +62,36 @@ public class GetRecordFieldsEvalFactory implements ICopyEvaluatorFactory {
private final ARecordPointable recordPointable = (ARecordPointable) ARecordPointable.FACTORY
.createPointable();
-
- private ArrayBackedValueStorage outInput0 = new ArrayBackedValueStorage();
- private ICopyEvaluator eval0 = recordEvalFactory.createEvaluator(outInput0);
- private DataOutput out = output.getDataOutput();
+ private IPointable inputArg0 = new VoidPointable();
+ private IScalarEvaluator eval0 = recordEvalFactory.createScalarEvaluator(ctx);
+ private ArrayBackedValueStorage resultStorage = new ArrayBackedValueStorage();
+ private DataOutput out = resultStorage.getDataOutput();
private RecordFieldsUtil rfu = new RecordFieldsUtil();
@Override
- public void evaluate(IFrameTupleReference tuple) throws AlgebricksException {
- outInput0.reset();
- eval0.evaluate(tuple);
+ public void evaluate(IFrameTupleReference tuple, IPointable result) throws AlgebricksException {
+ resultStorage.reset();
+ eval0.evaluate(tuple, inputArg0);
+ byte[] data = inputArg0.getByteArray();
+ int offset = inputArg0.getStartOffset();
+ int len = inputArg0.getLength();
- if (outInput0.getByteArray()[0] == ATypeTag.SERIALIZED_NULL_TYPE_TAG) {
+ if (data[offset] == ATypeTag.SERIALIZED_NULL_TYPE_TAG) {
try {
nullSerde.serialize(ANull.NULL, out);
+ result.set(resultStorage);
+ return;
} catch (HyracksDataException e) {
throw new AlgebricksException(e);
}
}
- if (outInput0.getByteArray()[0] != ATypeTag.SERIALIZED_RECORD_TYPE_TAG) {
+ if (data[offset] != ATypeTag.SERIALIZED_RECORD_TYPE_TAG) {
throw new AlgebricksException("Field accessor is not defined for values of type "
- + EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(outInput0.getByteArray()[0]));
+ + EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(data[offset]));
}
- recordPointable.set(outInput0.getByteArray(), outInput0.getStartOffset(), outInput0.getLength());
-
+ recordPointable.set(data, offset, len);
try {
rfu.processRecord(recordPointable, recordType, out, 0);
} catch (IOException e) {
@@ -91,6 +99,7 @@ public class GetRecordFieldsEvalFactory implements ICopyEvaluatorFactory {
} catch (AsterixException e) {
e.printStackTrace();
}
+ result.set(resultStorage);
}
};
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/0ae30836/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/records/RecordAddFieldsDescriptor.java
----------------------------------------------------------------------
diff --git a/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/records/RecordAddFieldsDescriptor.java b/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/records/RecordAddFieldsDescriptor.java
index 9e7c4ac..eee240b 100644
--- a/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/records/RecordAddFieldsDescriptor.java
+++ b/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/records/RecordAddFieldsDescriptor.java
@@ -19,7 +19,7 @@
package org.apache.asterix.runtime.evaluators.functions.records;
-import java.io.IOException;
+import java.io.DataOutput;
import java.util.List;
import org.apache.asterix.builders.RecordBuilder;
@@ -49,13 +49,15 @@ import org.apache.asterix.runtime.evaluators.functions.BinaryHashMap;
import org.apache.asterix.runtime.evaluators.functions.PointableHelper;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
-import org.apache.hyracks.algebricks.runtime.base.ICopyEvaluator;
-import org.apache.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
import org.apache.hyracks.api.dataflow.value.IBinaryHashFunction;
import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.data.std.api.IDataOutputProvider;
+import org.apache.hyracks.data.std.api.IPointable;
+import org.apache.hyracks.data.std.primitive.VoidPointable;
import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
@@ -84,8 +86,9 @@ public class RecordAddFieldsDescriptor extends AbstractScalarFunctionDynamicDesc
}
@Override
- public ICopyEvaluatorFactory createEvaluatorFactory(final ICopyEvaluatorFactory[] args) throws AlgebricksException {
- return new ICopyEvaluatorFactory() {
+ public IScalarEvaluatorFactory createEvaluatorFactory(final IScalarEvaluatorFactory[] args)
+ throws AlgebricksException {
+ return new IScalarEvaluatorFactory() {
private static final long serialVersionUID = 1L;
@SuppressWarnings("unchecked")
@@ -93,16 +96,16 @@ public class RecordAddFieldsDescriptor extends AbstractScalarFunctionDynamicDesc
.getSerializerDeserializer(BuiltinType.ANULL);
@Override
- public ICopyEvaluator createEvaluator(final IDataOutputProvider output) throws AlgebricksException {
+ public IScalarEvaluator createScalarEvaluator(final IHyracksTaskContext ctx) throws AlgebricksException {
final PointableAllocator allocator = new PointableAllocator();
final IVisitablePointable vp0 = allocator.allocateRecordValue(inRecType);
final IVisitablePointable vp1 = allocator.allocateListValue(inListType);
- final ArrayBackedValueStorage abvs0 = new ArrayBackedValueStorage();
- final ArrayBackedValueStorage abvs1 = new ArrayBackedValueStorage();
+ final IPointable argPtr0 = new VoidPointable();
+ final IPointable argPtr1 = new VoidPointable();
- final ICopyEvaluator eval0 = args[0].createEvaluator(abvs0);
- final ICopyEvaluator eval1 = args[1].createEvaluator(abvs1);
+ final IScalarEvaluator eval0 = args[0].createScalarEvaluator(ctx);
+ final IScalarEvaluator eval1 = args[1].createScalarEvaluator(ctx);
final ArrayBackedValueStorage fieldNamePointable = new ArrayBackedValueStorage();
final ArrayBackedValueStorage fieldValuePointer = new ArrayBackedValueStorage();
@@ -114,7 +117,7 @@ public class RecordAddFieldsDescriptor extends AbstractScalarFunctionDynamicDesc
throw new AlgebricksException(e);
}
- return new ICopyEvaluator() {
+ return new IScalarEvaluator() {
public static final int TABLE_FRAME_SIZE = 32768; // the default 32k frame size
public static final int TABLE_SIZE = 100; // the default 32k frame size
private final RecordBuilder recordBuilder = new RecordBuilder();
@@ -132,42 +135,46 @@ public class RecordAddFieldsDescriptor extends AbstractScalarFunctionDynamicDesc
private BinaryHashMap hashMap = new BinaryHashMap(TABLE_SIZE, TABLE_FRAME_SIZE, putHashFunc,
getHashFunc, cmp);
+ private ArrayBackedValueStorage resultStorage = new ArrayBackedValueStorage();
+ private DataOutput out = resultStorage.getDataOutput();
+
@Override
- public void evaluate(IFrameTupleReference tuple) throws AlgebricksException {
+ public void evaluate(IFrameTupleReference tuple, IPointable result) throws AlgebricksException {
+ resultStorage.reset();
recordBuilder.reset(outRecType);
requiredRecordTypeInfo.reset(outRecType);
- abvs0.reset();
- abvs1.reset();
-
- eval0.evaluate(tuple);
- eval1.evaluate(tuple);
+ eval0.evaluate(tuple, argPtr0);
+ eval1.evaluate(tuple, argPtr1);
- if (abvs0.getByteArray()[0] == ATypeTag.SERIALIZED_NULL_TYPE_TAG
- || abvs1.getByteArray()[0] == ATypeTag.SERIALIZED_NULL_TYPE_TAG) {
+ if (argPtr0.getByteArray()[argPtr0.getStartOffset()] == ATypeTag.SERIALIZED_NULL_TYPE_TAG
+ || argPtr1.getByteArray()[argPtr1
+ .getStartOffset()] == ATypeTag.SERIALIZED_NULL_TYPE_TAG) {
try {
- nullSerDe.serialize(ANull.NULL, output.getDataOutput());
+ nullSerDe.serialize(ANull.NULL, out);
} catch (HyracksDataException e) {
throw new AlgebricksException(e);
}
+ result.set(resultStorage);
return;
}
// Make sure we get a valid record
- if (abvs0.getByteArray()[0] != ATypeTag.SERIALIZED_RECORD_TYPE_TAG) {
+ if (argPtr0.getByteArray()[argPtr0.getStartOffset()] != ATypeTag.SERIALIZED_RECORD_TYPE_TAG) {
throw new AlgebricksException("Expected an ordederlist of type " + inRecType + " but "
- + "got "
- + EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(abvs0.getByteArray()[0]));
+ + "got " + EnumDeserializer.ATYPETAGDESERIALIZER
+ .deserialize(argPtr0.getByteArray()[argPtr0.getStartOffset()]));
}
// Make sure we get a valid list
- if (abvs1.getByteArray()[0] != ATypeTag.SERIALIZED_ORDEREDLIST_TYPE_TAG) {
+ if (argPtr1.getByteArray()[argPtr1
+ .getStartOffset()] != ATypeTag.SERIALIZED_ORDEREDLIST_TYPE_TAG) {
throw new AlgebricksException("Expected an ordederlist of type " + inListType + " but "
- + "got "
- + EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(abvs1.getByteArray()[0]));
+ + "got " + EnumDeserializer.ATYPETAGDESERIALIZER
+ .deserialize(argPtr1.getByteArray()[argPtr1.getStartOffset()]));
}
- vp0.set(abvs0);
- vp1.set(abvs1);
+ vp0.set(argPtr0);
+ vp1.set(argPtr1);
try {
ARecordVisitablePointable recordPointable = (ARecordVisitablePointable) vp0;
@@ -183,10 +190,11 @@ public class RecordAddFieldsDescriptor extends AbstractScalarFunctionDynamicDesc
hashMap.clear();
}
addFields(recordPointable, listPointable);
- recordBuilder.write(output.getDataOutput(), true);
+ recordBuilder.write(out, true);
} catch (HyracksDataException e) {
throw new AlgebricksException(e);
}
+ result.set(resultStorage);
}
private void addFields(ARecordVisitablePointable inputRecordPointer,
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/0ae30836/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/records/RecordFieldsUtil.java
----------------------------------------------------------------------
diff --git a/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/records/RecordFieldsUtil.java b/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/records/RecordFieldsUtil.java
index bbba625..93c1f1b 100644
--- a/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/records/RecordFieldsUtil.java
+++ b/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/records/RecordFieldsUtil.java
@@ -54,10 +54,6 @@ import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
public class RecordFieldsUtil {
- private final static byte SER_RECORD_TYPE_TAG = ATypeTag.RECORD.serialize();
- private final static byte SER_ORDERED_LIST_TYPE_TAG = ATypeTag.ORDEREDLIST.serialize();
- private final static byte SER_UNORDERED_LIST_TYPE_TAG = ATypeTag.UNORDEREDLIST.serialize();
-
private final static AString fieldName = new AString("field-name");
private final static AString typeName = new AString("field-type");
private final static AString isOpenName = new AString("is-open");
@@ -118,15 +114,17 @@ public class RecordFieldsUtil {
addIsOpenField(false, fieldRecordBuilder);
// write nested or list types
- if (tag == SER_RECORD_TYPE_TAG || tag == SER_ORDERED_LIST_TYPE_TAG || tag == SER_UNORDERED_LIST_TYPE_TAG) {
+ if (tag == ATypeTag.SERIALIZED_RECORD_TYPE_TAG || tag == ATypeTag.SERIALIZED_ORDEREDLIST_TYPE_TAG
+ || tag == ATypeTag.SERIALIZED_UNORDEREDLIST_TYPE_TAG) {
if (!recordAccessor.isClosedFieldNull(recType, i)) {
IAType fieldType = recordAccessor.getClosedFieldType(recType, i);
ArrayBackedValueStorage tmpValue = getTempBuffer();
tmpValue.reset();
recordAccessor.getClosedFieldValue(recType, i, tmpValue.getDataOutput());
- if (tag == SER_RECORD_TYPE_TAG) {
+ if (tag == ATypeTag.SERIALIZED_RECORD_TYPE_TAG) {
addNestedField(tmpValue, fieldType, fieldRecordBuilder, level + 1);
- } else if (tag == SER_ORDERED_LIST_TYPE_TAG || tag == SER_UNORDERED_LIST_TYPE_TAG) {
+ } else if (tag == ATypeTag.SERIALIZED_ORDEREDLIST_TYPE_TAG
+ || tag == ATypeTag.SERIALIZED_UNORDEREDLIST_TYPE_TAG) {
addListField(tmpValue, fieldType, fieldRecordBuilder, level + 1);
}
}
@@ -155,14 +153,16 @@ public class RecordFieldsUtil {
addIsOpenField(true, fieldRecordBuilder);
// write nested or list types
- if (tag == SER_RECORD_TYPE_TAG || tag == SER_ORDERED_LIST_TYPE_TAG || tag == SER_UNORDERED_LIST_TYPE_TAG) {
+ if (tag == ATypeTag.SERIALIZED_RECORD_TYPE_TAG || tag == ATypeTag.SERIALIZED_ORDEREDLIST_TYPE_TAG
+ || tag == ATypeTag.SERIALIZED_UNORDEREDLIST_TYPE_TAG) {
IAType fieldType = null;
ArrayBackedValueStorage tmpValue = getTempBuffer();
tmpValue.reset();
recordAccessor.getOpenFieldValue(recType, i, tmpValue.getDataOutput());
- if (tag == SER_RECORD_TYPE_TAG) {
+ if (tag == ATypeTag.SERIALIZED_RECORD_TYPE_TAG) {
addNestedField(tmpValue, fieldType, fieldRecordBuilder, level + 1);
- } else if (tag == SER_ORDERED_LIST_TYPE_TAG || tag == SER_UNORDERED_LIST_TYPE_TAG) {
+ } else if (tag == ATypeTag.SERIALIZED_ORDEREDLIST_TYPE_TAG
+ || tag == ATypeTag.SERIALIZED_UNORDEREDLIST_TYPE_TAG) {
addListField(tmpValue, fieldType, fieldRecordBuilder, level + 1);
}
}
@@ -277,7 +277,7 @@ public class RecordFieldsUtil {
byte tagId = list.getItemTag(act, l);
addFieldType(tagId, listRecordBuilder);
- if (tagId == SER_RECORD_TYPE_TAG) {
+ if (tagId == ATypeTag.SERIALIZED_RECORD_TYPE_TAG) {
ArrayBackedValueStorage tmpAbvs = getTempBuffer();
list.getItemValue(act, l, tmpAbvs.getDataOutput());
addNestedField(tmpAbvs, act.getItemType(), listRecordBuilder, level + 1);
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/0ae30836/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/records/RecordMergeDescriptor.java
----------------------------------------------------------------------
diff --git a/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/records/RecordMergeDescriptor.java b/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/records/RecordMergeDescriptor.java
index afd6fc1..0f11a48 100644
--- a/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/records/RecordMergeDescriptor.java
+++ b/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/records/RecordMergeDescriptor.java
@@ -18,6 +18,7 @@
*/
package org.apache.asterix.runtime.evaluators.functions.records;
+import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
@@ -44,11 +45,13 @@ import org.apache.asterix.runtime.evaluators.comparisons.DeepEqualAssessor;
import org.apache.asterix.runtime.evaluators.functions.PointableHelper;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
-import org.apache.hyracks.algebricks.runtime.base.ICopyEvaluator;
-import org.apache.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.data.std.api.IDataOutputProvider;
+import org.apache.hyracks.data.std.api.IPointable;
+import org.apache.hyracks.data.std.primitive.VoidPointable;
import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
@@ -81,8 +84,9 @@ public class RecordMergeDescriptor extends AbstractScalarFunctionDynamicDescript
}
@Override
- public ICopyEvaluatorFactory createEvaluatorFactory(final ICopyEvaluatorFactory[] args) throws AlgebricksException {
- return new ICopyEvaluatorFactory() {
+ public IScalarEvaluatorFactory createEvaluatorFactory(final IScalarEvaluatorFactory[] args)
+ throws AlgebricksException {
+ return new IScalarEvaluatorFactory() {
private static final long serialVersionUID = 1L;
@@ -91,57 +95,58 @@ public class RecordMergeDescriptor extends AbstractScalarFunctionDynamicDescript
.getSerializerDeserializer(BuiltinType.ANULL);
@Override
- public ICopyEvaluator createEvaluator(final IDataOutputProvider output) throws AlgebricksException {
+ public IScalarEvaluator createScalarEvaluator(final IHyracksTaskContext ctx) throws AlgebricksException {
final PointableAllocator pa = new PointableAllocator();
final IVisitablePointable vp0 = pa.allocateRecordValue(inRecType0);
final IVisitablePointable vp1 = pa.allocateRecordValue(inRecType1);
- final ArrayBackedValueStorage abvs0 = new ArrayBackedValueStorage();
- final ArrayBackedValueStorage abvs1 = new ArrayBackedValueStorage();
+ final IPointable argPtr0 = new VoidPointable();
+ final IPointable argPtr1 = new VoidPointable();
- final ICopyEvaluator eval0 = args[0].createEvaluator(abvs0);
- final ICopyEvaluator eval1 = args[1].createEvaluator(abvs1);
+ final IScalarEvaluator eval0 = args[0].createScalarEvaluator(ctx);
+ final IScalarEvaluator eval1 = args[1].createScalarEvaluator(ctx);
final List<RecordBuilder> rbStack = new ArrayList<>();
final ArrayBackedValueStorage tabvs = new ArrayBackedValueStorage();
- return new ICopyEvaluator() {
+ return new IScalarEvaluator() {
private final RuntimeRecordTypeInfo runtimeRecordTypeInfo = new RuntimeRecordTypeInfo();
private final DeepEqualAssessor deepEqualAssesor = new DeepEqualAssessor();
+ private ArrayBackedValueStorage resultStorage = new ArrayBackedValueStorage();
+ private DataOutput out = resultStorage.getDataOutput();
@Override
- public void evaluate(IFrameTupleReference tuple) throws AlgebricksException {
- abvs0.reset();
- abvs1.reset();
+ public void evaluate(IFrameTupleReference tuple, IPointable result) throws AlgebricksException {
+ resultStorage.reset();
+ eval0.evaluate(tuple, argPtr0);
+ eval1.evaluate(tuple, argPtr1);
- eval0.evaluate(tuple);
- eval1.evaluate(tuple);
-
- if (abvs0.getByteArray()[0] == ATypeTag.SERIALIZED_NULL_TYPE_TAG
- || abvs1.getByteArray()[0] == ATypeTag.SERIALIZED_NULL_TYPE_TAG) {
+ if (argPtr0.getByteArray()[argPtr0.getStartOffset()] == ATypeTag.SERIALIZED_NULL_TYPE_TAG
+ || argPtr1.getByteArray()[argPtr1.getStartOffset()] == ATypeTag.SERIALIZED_NULL_TYPE_TAG) {
try {
- nullSerDe.serialize(ANull.NULL, output.getDataOutput());
+ nullSerDe.serialize(ANull.NULL, out);
} catch (HyracksDataException e) {
throw new AlgebricksException(e);
}
+ result.set(resultStorage);
return;
}
- vp0.set(abvs0);
- vp1.set(abvs1);
+ vp0.set(argPtr0);
+ vp1.set(argPtr1);
ARecordVisitablePointable rp0 = (ARecordVisitablePointable) vp0;
ARecordVisitablePointable rp1 = (ARecordVisitablePointable) vp1;
try {
mergeFields(outRecType, rp0, rp1, true, 0);
-
- rbStack.get(0).write(output.getDataOutput(), true);
+ rbStack.get(0).write(out, true);
} catch (IOException | AsterixException e) {
throw new AlgebricksException(e);
}
+ result.set(resultStorage);
}
private void mergeFields(ARecordType combinedType, ARecordVisitablePointable leftRecord,
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/0ae30836/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/records/RecordRemoveFieldsDescriptor.java
----------------------------------------------------------------------
diff --git a/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/records/RecordRemoveFieldsDescriptor.java b/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/records/RecordRemoveFieldsDescriptor.java
index ae752f9..09639d7 100644
--- a/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/records/RecordRemoveFieldsDescriptor.java
+++ b/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/records/RecordRemoveFieldsDescriptor.java
@@ -27,7 +27,7 @@ import org.apache.asterix.om.types.IAType;
import org.apache.asterix.runtime.evaluators.base.AbstractScalarFunctionDynamicDescriptor;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
-import org.apache.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
public class RecordRemoveFieldsDescriptor extends AbstractScalarFunctionDynamicDescriptor {
private static final long serialVersionUID = 1L;
@@ -52,7 +52,8 @@ public class RecordRemoveFieldsDescriptor extends AbstractScalarFunctionDynamicD
}
@Override
- public ICopyEvaluatorFactory createEvaluatorFactory(final ICopyEvaluatorFactory[] args) throws AlgebricksException {
+ public IScalarEvaluatorFactory createEvaluatorFactory(final IScalarEvaluatorFactory[] args)
+ throws AlgebricksException {
return new RecordRemoveFieldsEvalFactory(args[0], args[1], outputRecordType, inputRecType, inputListType);
}