You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@vxquery.apache.org by ch...@apache.org on 2017/05/22 18:11:56 UTC
[2/3] vxquery git commit: VXQUERY-105: Add group-by functionality,
Add scalability to JSON parser
http://git-wip-us.apache.org/repos/asf/vxquery/blob/53b86c24/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/PushValueIntoDatascanRule.java
----------------------------------------------------------------------
diff --git a/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/PushValueIntoDatascanRule.java b/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/PushValueIntoDatascanRule.java
new file mode 100644
index 0000000..1d8a55d
--- /dev/null
+++ b/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/PushValueIntoDatascanRule.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.vxquery.compiler.rewriter.rules;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
+import org.apache.vxquery.compiler.rewriter.rules.util.ExpressionToolbox;
+import org.apache.vxquery.functions.BuiltinFunctions;
+import org.apache.vxquery.functions.BuiltinOperators;
+import org.apache.vxquery.metadata.IVXQueryDataSource;
+import org.apache.vxquery.metadata.VXQueryCollectionDataSource;
+
+/**
+ * The rule searches for an assign operator immediately following a data scan
+ * operator.
+ *
+ * <pre>
+ * Before
+ *
+ * plan__parent
+ * ASSIGN( $v2 : value( $v1, constant) )
+ * DATASCAN( $source : $v1 )
+ * plan__child
+ *
+ * Where $v1 is not used in plan__parent.
+ *
+ * After
+ *
+ * plan__parent
+ * ASSIGN( $v2 : $v1 )
+ * DATASCAN( $source : $v1 )
+ * plan__child
+ *
+ * $source is encoded with the value parameters.
+ * </pre>
+ */
+
+public class PushValueIntoDatascanRule extends AbstractPushExpressionIntoDatascanRule {
+
+ @Override
+ boolean updateDataSource(IVXQueryDataSource datasource, Mutable<ILogicalExpression> expression) {
+ VXQueryCollectionDataSource ds = (VXQueryCollectionDataSource) datasource;
+ boolean added = false;
+ List<Mutable<ILogicalExpression>> finds = new ArrayList<Mutable<ILogicalExpression>>();
+ ILogicalExpression le = expression.getValue();
+ if (le.getExpressionTag() == LogicalExpressionTag.FUNCTION_CALL) {
+ AbstractFunctionCallExpression afce = (AbstractFunctionCallExpression) le;
+ if (afce.getFunctionIdentifier().equals(BuiltinFunctions.FN_ZERO_OR_ONE_1.getFunctionIdentifier())) {
+ return false;
+ }
+ }
+ ExpressionToolbox.findAllFunctionExpressions(expression, BuiltinOperators.VALUE.getFunctionIdentifier(), finds);
+
+ for (int i = finds.size(); i > 0; --i) {
+ Byte[] value = null;
+ List<ILogicalExpression> values = ExpressionToolbox.getFullArguments(finds.get(i - 1));
+ if (values.size() > 1) {
+ value = ExpressionToolbox.getConstantArgument(finds.get(i - 1), 1);
+ ds.addValueSeq(value);
+ added = true;
+ }
+ }
+
+ return added;
+ }
+
+ @Override
+ LogicalOperatorTag getOperator() {
+ return LogicalOperatorTag.ASSIGN;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/vxquery/blob/53b86c24/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/RemoveUnusedSortDistinctNodesRule.java
----------------------------------------------------------------------
diff --git a/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/RemoveUnusedSortDistinctNodesRule.java b/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/RemoveUnusedSortDistinctNodesRule.java
index 69940ad..d86de98 100644
--- a/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/RemoveUnusedSortDistinctNodesRule.java
+++ b/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/RemoveUnusedSortDistinctNodesRule.java
@@ -41,6 +41,7 @@ import org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceE
import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
@@ -94,12 +95,14 @@ import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
public class RemoveUnusedSortDistinctNodesRule implements IAlgebraicRewriteRule {
@Override
- public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException {
+ public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
+ throws AlgebricksException {
return false;
}
@Override
- public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context) {
+ public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
+ throws AlgebricksException {
boolean operatorChanged = false;
// Do not process empty or nested tuple source.
AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();
@@ -151,7 +154,9 @@ public class RemoveUnusedSortDistinctNodesRule implements IAlgebraicRewriteRule
}
}
}
-
+ if (operatorChanged) {
+ context.computeAndSetTypeEnvironmentForOperator(op);
+ }
// Now with the new operator, update the variable mappings.
cardinalityVariable = CardinalityRuleToolbox.updateCardinalityVariable(op, cardinalityVariable, vxqueryContext);
updateVariableMap(op, cardinalityVariable, documentOrderVariables, uniqueNodesVariables, vxqueryContext);
@@ -178,8 +183,8 @@ public class RemoveUnusedSortDistinctNodesRule implements IAlgebraicRewriteRule
return 0;
}
AbstractFunctionCallExpression functionCall = (AbstractFunctionCallExpression) logicalExpression;
- if (!functionCall.getFunctionIdentifier().equals(
- BuiltinOperators.SORT_DISTINCT_NODES_ASC_OR_ATOMICS.getFunctionIdentifier())) {
+ if (!functionCall.getFunctionIdentifier()
+ .equals(BuiltinOperators.SORT_DISTINCT_NODES_ASC_OR_ATOMICS.getFunctionIdentifier())) {
return 0;
}
@@ -314,7 +319,8 @@ public class RemoveUnusedSortDistinctNodesRule implements IAlgebraicRewriteRule
* @param uniqueNodesVariables
* @param uniqueNodes
*/
- private void resetUniqueNodesVariables(HashMap<Integer, UniqueNodes> uniqueNodesVariables, UniqueNodes uniqueNodes) {
+ private void resetUniqueNodesVariables(HashMap<Integer, UniqueNodes> uniqueNodesVariables,
+ UniqueNodes uniqueNodes) {
for (Entry<Integer, UniqueNodes> entry : uniqueNodesVariables.entrySet()) {
uniqueNodesVariables.put(entry.getKey(), uniqueNodes);
}
@@ -349,8 +355,8 @@ public class RemoveUnusedSortDistinctNodesRule implements IAlgebraicRewriteRule
case ASSIGN:
AssignOperator assign = (AssignOperator) op;
for (int index = 0; index < assign.getExpressions().size(); index++) {
- ILogicalExpression assignLogicalExpression = (ILogicalExpression) assign.getExpressions()
- .get(index).getValue();
+ ILogicalExpression assignLogicalExpression = (ILogicalExpression) assign.getExpressions().get(index)
+ .getValue();
variableId = assign.getVariables().get(index).getId();
documentOrder = propagateDocumentOrder(assignLogicalExpression, documentOrderVariablesForOperator);
uniqueNodes = propagateUniqueNodes(assignLogicalExpression, uniqueNodesVariablesForOperator);
@@ -384,8 +390,8 @@ public class RemoveUnusedSortDistinctNodesRule implements IAlgebraicRewriteRule
// Find the last operator to set a variable and call this function again.
SubplanOperator subplan = (SubplanOperator) op;
for (int index = 0; index < subplan.getNestedPlans().size(); index++) {
- AbstractLogicalOperator lastOperator = (AbstractLogicalOperator) subplan.getNestedPlans()
- .get(index).getRoots().get(0).getValue();
+ AbstractLogicalOperator lastOperator = (AbstractLogicalOperator) subplan.getNestedPlans().get(index)
+ .getRoots().get(0).getValue();
updateVariableMap(lastOperator, cardinalityVariable, documentOrderVariables, uniqueNodesVariables,
vxqueryContext);
}
@@ -395,8 +401,8 @@ public class RemoveUnusedSortDistinctNodesRule implements IAlgebraicRewriteRule
UnnestOperator unnest = (UnnestOperator) op;
ILogicalExpression unnestLogicalExpression = (ILogicalExpression) unnest.getExpressionRef().getValue();
variableId = unnest.getVariables().get(0).getId();
- Cardinality inputCardinality = vxqueryContext.getCardinalityOperatorMap(op.getInputs().get(0)
- .getValue());
+ Cardinality inputCardinality = vxqueryContext
+ .getCardinalityOperatorMap(op.getInputs().get(0).getValue());
documentOrder = propagateDocumentOrder(unnestLogicalExpression, documentOrderVariablesForOperator);
uniqueNodes = propagateUniqueNodes(unnestLogicalExpression, uniqueNodesVariablesForOperator);
@@ -425,10 +431,12 @@ public class RemoveUnusedSortDistinctNodesRule implements IAlgebraicRewriteRule
break;
// The following operators do not change or add to the variable map.
+
case DATASOURCESCAN:
case DISTRIBUTE_RESULT:
case EMPTYTUPLESOURCE:
case EXCHANGE:
+ case GROUP:
case NESTEDTUPLESOURCE:
case PROJECT:
case SELECT:
@@ -438,8 +446,8 @@ public class RemoveUnusedSortDistinctNodesRule implements IAlgebraicRewriteRule
// The following operators' analysis has not yet been implemented.
default:
- throw new RuntimeException("Operator (" + op.getOperatorTag()
- + ") has not been implemented in rewrite rule.");
+ throw new RuntimeException(
+ "Operator (" + op.getOperatorTag() + ") has not been implemented in rewrite rule.");
}
}
http://git-wip-us.apache.org/repos/asf/vxquery/blob/53b86c24/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/SetVariableIdContextRule.java
----------------------------------------------------------------------
diff --git a/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/SetVariableIdContextRule.java b/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/SetVariableIdContextRule.java
index 82be94c..3fb2696 100644
--- a/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/SetVariableIdContextRule.java
+++ b/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/SetVariableIdContextRule.java
@@ -28,11 +28,13 @@ import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
/**
* Set the default context for the variable id in the optimization context.
+ *
* @author prestonc
*/
public class SetVariableIdContextRule implements IAlgebraicRewriteRule {
@Override
- public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException {
+ public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
+ throws AlgebricksException {
if (context.checkIfInDontApplySet(this, opRef.getValue())) {
return false;
}
@@ -44,7 +46,9 @@ public class SetVariableIdContextRule implements IAlgebraicRewriteRule {
case ASSIGN:
case AGGREGATE:
AbstractAssignOperator assign = (AbstractAssignOperator) op;
- variableId = assign.getVariables().get(0).getId();
+ if (assign.getVariables().size() > 0) {
+ variableId = assign.getVariables().get(0).getId();
+ }
break;
case UNNEST:
UnnestOperator unnest = (UnnestOperator) op;
@@ -62,7 +66,8 @@ public class SetVariableIdContextRule implements IAlgebraicRewriteRule {
}
@Override
- public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException {
+ public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
+ throws AlgebricksException {
return false;
}
}
http://git-wip-us.apache.org/repos/asf/vxquery/blob/53b86c24/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/util/ExpressionToolbox.java
----------------------------------------------------------------------
diff --git a/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/util/ExpressionToolbox.java b/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/util/ExpressionToolbox.java
index 0dd3b31..50bc07e 100644
--- a/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/util/ExpressionToolbox.java
+++ b/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/util/ExpressionToolbox.java
@@ -16,8 +16,10 @@
*/
package org.apache.vxquery.compiler.rewriter.rules.util;
+import java.util.ArrayList;
import java.util.List;
+import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.mutable.Mutable;
import org.apache.vxquery.compiler.algebricks.VXQueryConstantValue;
import org.apache.vxquery.context.StaticContext;
@@ -211,6 +213,27 @@ public class ExpressionToolbox {
return pTypeCode.getInteger();
}
+ public static Byte[] getConstantArgument(Mutable<ILogicalExpression> searchM, int arg) {
+ AbstractFunctionCallExpression searchFunction = (AbstractFunctionCallExpression) searchM.getValue();
+ ILogicalExpression argType = searchFunction.getArguments().get(arg).getValue();
+ searchFunction.getArguments().size();
+ if (argType.getExpressionTag() != LogicalExpressionTag.CONSTANT) {
+ return null;
+ }
+ TaggedValuePointable tvp = (TaggedValuePointable) TaggedValuePointable.FACTORY.createPointable();
+ ExpressionToolbox.getConstantAsPointable((ConstantExpression) argType, tvp);
+ return ArrayUtils.toObject(tvp.getByteArray());
+ }
+
+ public static List<ILogicalExpression> getFullArguments(Mutable<ILogicalExpression> searchM) {
+ AbstractFunctionCallExpression searchFunction = (AbstractFunctionCallExpression) searchM.getValue();
+ ArrayList<ILogicalExpression> args = new ArrayList<ILogicalExpression>();
+ for (int i = 0; i < searchFunction.getArguments().size(); i++) {
+ args.add(searchFunction.getArguments().get(i).getValue());
+ }
+ return args;
+ }
+
public static SequenceType getTypeExpressionTypeArgument(Mutable<ILogicalExpression> searchM, StaticContext dCtx) {
int typeId = getTypeExpressionTypeArgument(searchM);
if (typeId > 0) {
http://git-wip-us.apache.org/repos/asf/vxquery/blob/53b86c24/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/util/OperatorToolbox.java
----------------------------------------------------------------------
diff --git a/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/util/OperatorToolbox.java b/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/util/OperatorToolbox.java
index 2c57c32..94040d2 100644
--- a/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/util/OperatorToolbox.java
+++ b/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/util/OperatorToolbox.java
@@ -20,7 +20,7 @@ import java.util.ArrayList;
import java.util.List;
import org.apache.commons.lang3.mutable.Mutable;
-
+import org.apache.commons.lang3.mutable.MutableObject;
import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
http://git-wip-us.apache.org/repos/asf/vxquery/blob/53b86c24/vxquery-core/src/main/java/org/apache/vxquery/exceptions/SystemException.java
----------------------------------------------------------------------
diff --git a/vxquery-core/src/main/java/org/apache/vxquery/exceptions/SystemException.java b/vxquery-core/src/main/java/org/apache/vxquery/exceptions/SystemException.java
index a3f7bf8..886229d 100644
--- a/vxquery-core/src/main/java/org/apache/vxquery/exceptions/SystemException.java
+++ b/vxquery-core/src/main/java/org/apache/vxquery/exceptions/SystemException.java
@@ -51,6 +51,11 @@ public class SystemException extends HyracksDataException {
super(message(code, loc));
this.code = code;
}
+
+ public SystemException(ErrorCode code, SourceLocation loc, Throwable cause) {
+ super(message(code, loc), cause);
+ this.code = code;
+ }
public ErrorCode getCode() {
return code;
http://git-wip-us.apache.org/repos/asf/vxquery/blob/53b86c24/vxquery-core/src/main/java/org/apache/vxquery/functions/builtin-operators.xml
----------------------------------------------------------------------
diff --git a/vxquery-core/src/main/java/org/apache/vxquery/functions/builtin-operators.xml b/vxquery-core/src/main/java/org/apache/vxquery/functions/builtin-operators.xml
index 0b03c34..ec27864 100644
--- a/vxquery-core/src/main/java/org/apache/vxquery/functions/builtin-operators.xml
+++ b/vxquery-core/src/main/java/org/apache/vxquery/functions/builtin-operators.xml
@@ -660,6 +660,7 @@
<param name="expr" type="json-item()"/>
<return type="xs:string*"/>
<runtime type="scalar" class="org.apache.vxquery.runtime.functions.json.KeysOrMembersScalarEvaluatorFactory"/>
+ <runtime type="unnesting" class="org.apache.vxquery.runtime.functions.json.KeysOrMembersUnnestingEvaluatorFactory"/>
</operator>
<!-- opext:subtract($arg1 as xs:anyAtomicType?, $arg2 as xs:anyAtomicType?) as xs:anyAtomicType? -->
http://git-wip-us.apache.org/repos/asf/vxquery/blob/53b86c24/vxquery-core/src/main/java/org/apache/vxquery/jsonparser/JSONParser.java
----------------------------------------------------------------------
diff --git a/vxquery-core/src/main/java/org/apache/vxquery/jsonparser/JSONParser.java b/vxquery-core/src/main/java/org/apache/vxquery/jsonparser/JSONParser.java
index ad8db4e..edf8dbc 100644
--- a/vxquery-core/src/main/java/org/apache/vxquery/jsonparser/JSONParser.java
+++ b/vxquery-core/src/main/java/org/apache/vxquery/jsonparser/JSONParser.java
@@ -1,43 +1,54 @@
/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+* http://www.apache.org/licenses/LICENSE-2.0
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
package org.apache.vxquery.jsonparser;
+import java.io.ByteArrayOutputStream;
import java.io.DataOutput;
import java.io.IOException;
import java.io.Reader;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.htrace.fasterxml.jackson.core.JsonFactory;
+import org.apache.htrace.fasterxml.jackson.core.JsonParser;
+import org.apache.htrace.fasterxml.jackson.core.JsonToken;
+import org.apache.hyracks.api.comm.IFrameFieldAppender;
+import org.apache.hyracks.api.comm.IFrameWriter;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.primitive.BooleanPointable;
import org.apache.hyracks.data.std.primitive.UTF8StringPointable;
import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
+import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
+import org.apache.vxquery.datamodel.accessors.TaggedValuePointable;
import org.apache.vxquery.datamodel.builders.atomic.StringValueBuilder;
import org.apache.vxquery.datamodel.builders.jsonitem.ArrayBuilder;
import org.apache.vxquery.datamodel.builders.jsonitem.ObjectBuilder;
import org.apache.vxquery.datamodel.builders.sequence.SequenceBuilder;
import org.apache.vxquery.datamodel.values.ValueTag;
+import org.apache.vxquery.datamodel.values.XDMConstants;
import org.apache.vxquery.xmlparser.IParser;
-import com.fasterxml.jackson.core.JsonFactory;
-import com.fasterxml.jackson.core.JsonParser;
-import com.fasterxml.jackson.core.JsonToken;
-
public class JSONParser implements IParser {
-
final JsonFactory factory;
+ final List<Byte[]> valueSeq;
protected final ArrayBackedValueStorage atomic;
+ private TaggedValuePointable tvp;
+ private BooleanPointable bp;
protected final List<ArrayBuilder> abStack;
protected final List<ObjectBuilder> obStack;
protected final List<ArrayBackedValueStorage> abvsStack;
@@ -48,6 +59,16 @@ public class JSONParser implements IParser {
protected final DataOutput out;
protected itemType checkItem;
protected int levelArray, levelObject;
+ protected final List<Byte[]> allKeys;
+ protected ByteArrayOutputStream outputStream, prefixStream, pathStream;
+ protected int objectMatchLevel;
+ protected int arrayMatchLevel;
+ protected boolean matched, literal;
+ protected ArrayBackedValueStorage tempABVS;
+ protected List<Integer> arrayCounters;
+ protected List<Boolean> keysOrMembers;
+ protected IFrameWriter writer;
+ protected IFrameFieldAppender appender;
enum itemType {
ARRAY,
@@ -57,8 +78,14 @@ public class JSONParser implements IParser {
protected final List<itemType> itemStack;
public JSONParser() {
+ this(null);
+ }
+
+ public JSONParser(List<Byte[]> valueSeq) {
factory = new JsonFactory();
+ this.valueSeq = valueSeq;
atomic = new ArrayBackedValueStorage();
+ tvp = new TaggedValuePointable();
abStack = new ArrayList<ArrayBuilder>();
obStack = new ArrayList<ObjectBuilder>();
abvsStack = new ArrayList<ArrayBackedValueStorage>();
@@ -67,9 +94,59 @@ public class JSONParser implements IParser {
itemStack = new ArrayList<itemType>();
svb = new StringValueBuilder();
sb = new SequenceBuilder();
+ bp = new BooleanPointable();
+ allKeys = new ArrayList<Byte[]>();
abvsStack.add(atomic);
out = abvsStack.get(abvsStack.size() - 1).getDataOutput();
+ tempABVS = new ArrayBackedValueStorage();
+ this.objectMatchLevel = 1;
+ this.arrayMatchLevel = 0;
+ matched = false;
+ literal = false;
+ arrayCounters = new ArrayList<Integer>();
+ outputStream = new ByteArrayOutputStream();
+ prefixStream = new ByteArrayOutputStream();
+ pathStream = new ByteArrayOutputStream();
+ this.keysOrMembers = new ArrayList<Boolean>();
+ outputStream.reset();
+ pathStream.reset();
+ if (valueSeq != null) {
+ for (int i = 0; i < this.valueSeq.size(); i++) {
+ tvp.set(ArrayUtils.toPrimitive(valueSeq.get(i)), 0, ArrayUtils.toPrimitive(valueSeq.get(i)).length);
+ //access an item of an array
+ if (tvp.getTag() == ValueTag.XS_INTEGER_TAG) {
+ pathStream.write(tvp.getByteArray(), 0, tvp.getLength());
+ this.arrayMatchLevel++;
+ this.keysOrMembers.add(Boolean.valueOf(true));
+ //access all the items of an array or
+ //all the keys of an object
+ } else if (tvp.getTag() == ValueTag.XS_BOOLEAN_TAG) {
+ pathStream.write(tvp.getByteArray(), 0, tvp.getLength());
+ this.arrayMatchLevel++;
+ this.keysOrMembers.add(Boolean.valueOf(false));
+ //access an object
+ } else {
+ pathStream.write(tvp.getByteArray(), 1, tvp.getLength() - 1);
+ }
+ }
+ }
+ }
+ Byte[] toBytes(Integer v) {
+ Byte[] barr = ArrayUtils.toObject(ByteBuffer.allocate(9).putLong(1, v).array());
+ barr[0] = ValueTag.XS_INTEGER_TAG;
+ return barr;
+ }
+
+ public int parse(Reader input, ArrayBackedValueStorage result, IFrameWriter writer, IFrameFieldAppender appender)
+ throws HyracksDataException {
+ this.writer = writer;
+ this.appender = appender;
+ if (this.valueSeq != null) {
+ return parseElements(input, result);
+ } else {
+ return parse(input, result);
+ }
}
public int parse(Reader input, ArrayBackedValueStorage result) throws HyracksDataException {
@@ -79,7 +156,6 @@ public class JSONParser implements IParser {
JsonParser parser = factory.createParser(input);
JsonToken token = parser.nextToken();
checkItem = null;
-
levelArray = 0;
levelObject = 0;
sb.reset(result);
@@ -89,47 +165,22 @@ public class JSONParser implements IParser {
}
switch (token) {
case START_ARRAY:
- levelArray++;
- if (levelArray > abStack.size()) {
- abStack.add(new ArrayBuilder());
- }
- if (levelArray + levelObject > abvsStack.size() - 1) {
- abvsStack.add(new ArrayBackedValueStorage());
- }
- itemStack.add(itemType.ARRAY);
- abvsStack.get(levelArray + levelObject).reset();
- abStack.get(levelArray - 1).reset(abvsStack.get(levelArray + levelObject));
+ startArray();
break;
case START_OBJECT:
- levelObject++;
- if (levelObject > obStack.size()) {
- obStack.add(new ObjectBuilder());
- }
- if (levelArray + levelObject > abvsStack.size() - 1) {
- abvsStack.add(new ArrayBackedValueStorage());
- }
- itemStack.add(itemType.OBJECT);
- abvsStack.get(levelArray + levelObject).reset();
- obStack.get(levelObject - 1).reset(abvsStack.get(levelArray + levelObject));
+ startObject();
break;
case FIELD_NAME:
- if (levelObject > spStack.size()) {
- keyStack.add(new ArrayBackedValueStorage());
- spStack.add(new UTF8StringPointable());
- }
- keyStack.get(levelObject - 1).reset();
- DataOutput outk = keyStack.get(levelObject - 1).getDataOutput();
- svb.write(parser.getText(), outk);
- spStack.get(levelObject - 1).set(keyStack.get(levelObject - 1));
+ startFieldName(parser);
break;
case VALUE_NUMBER_INT:
- atomicValues(ValueTag.XS_INTEGER_TAG, parser, out, svb, levelArray, levelObject);
+ startAtomicValues(ValueTag.XS_INTEGER_TAG, parser);
break;
case VALUE_STRING:
- atomicValues(ValueTag.XS_STRING_TAG, parser, out, svb, levelArray, levelObject);
+ startAtomicValues(ValueTag.XS_STRING_TAG, parser);
break;
case VALUE_NUMBER_FLOAT:
- atomicValues(ValueTag.XS_DOUBLE_TAG, parser, out, svb, levelArray, levelObject);
+ startAtomicValues(ValueTag.XS_DOUBLE_TAG, parser);
break;
case END_ARRAY:
abStack.get(levelArray - 1).finish();
@@ -173,11 +224,160 @@ public class JSONParser implements IParser {
sb.finish();
outResult.write(result.getByteArray());
} catch (Exception e) {
- throw new HyracksDataException(e.toString());
+ throw new HyracksDataException("Accessing or writing in out of bounds space", e);
+ }
+ return items;
+ }
+
+ public int parseElements(Reader input, ArrayBackedValueStorage result) throws HyracksDataException {
+ int items = 0;
+ try {
+ JsonParser parser = factory.createParser(input);
+ JsonToken token = parser.nextToken();
+ checkItem = null;
+
+ this.objectMatchLevel = 0;
+ this.matched = false;
+
+ levelArray = 0;
+ levelObject = 0;
+ sb.reset(result);
+ while (token != null) {
+ if (itemStack.size() > 1) {
+ checkItem = itemStack.get(itemStack.size() - 2);
+ }
+ switch (token) {
+ case START_ARRAY:
+ startArray();
+ break;
+ case START_OBJECT:
+ startObject();
+ break;
+ case FIELD_NAME:
+ startFieldName(parser);
+ break;
+ case VALUE_NUMBER_INT:
+ startAtomicValues(ValueTag.XS_INTEGER_TAG, parser);
+ break;
+ case VALUE_STRING:
+ startAtomicValues(ValueTag.XS_STRING_TAG, parser);
+ break;
+ case VALUE_NUMBER_FLOAT:
+ startAtomicValues(ValueTag.XS_DOUBLE_TAG, parser);
+ break;
+ case END_ARRAY:
+ //if the query doesn't ask for an atomic value
+ if (!this.literal && this.pathMatch()) {
+ //check if the path asked from the query includes the current path
+ abStack.get(levelArray - 1).finish();
+ if (itemStack.size() > 1) {
+ if (checkItem == itemType.ARRAY) {
+ if (levelArray > this.arrayMatchLevel + 1) {
+ abStack.get(levelArray - 2).addItem(abvsStack.get(levelArray + levelObject));
+ } else if (this.matched) {
+ this.matched = false;
+ items++;
+ writeElement(abvsStack.get(levelArray + levelObject));
+ }
+ } else if (checkItem == itemType.OBJECT) {
+ if (levelArray > this.arrayMatchLevel && !this.matched) {
+ obStack.get(levelObject - 1).addItem(spStack.get(levelObject - 1),
+ abvsStack.get(levelArray + levelObject));
+ } else if (this.matched) {
+ writeElement(abvsStack.get(levelArray + levelObject));
+ this.matched = false;
+ items++;
+ }
+ }
+ }
+ }
+ if (allKeys.size() - 1 >= 0) {
+ allKeys.remove(allKeys.size() - 1);
+ }
+ this.arrayCounters.remove(levelArray - 1);
+ itemStack.remove(itemStack.size() - 1);
+ levelArray--;
+ break;
+ case END_OBJECT:
+ //if the query doesn't ask for an atomic value
+ if (!this.literal && this.pathMatch()) {
+ //check if the path asked from the query includes the current path
+ obStack.get(levelObject - 1).finish();
+ if (itemStack.size() > 1) {
+ if (checkItem == itemType.OBJECT) {
+ if (levelObject > this.objectMatchLevel) {
+ obStack.get(levelObject - 2).addItem(spStack.get(levelObject - 2),
+ abvsStack.get(levelArray + levelObject));
+ } else if (this.matched) {
+ this.matched = false;
+ items++;
+ writeElement(abvsStack.get(levelArray + levelObject));
+ }
+ } else if (checkItem == itemType.ARRAY) {
+ abStack.get(levelArray - 1).addItem(abvsStack.get(levelArray + levelObject));
+ if (this.matched) {
+ writeElement(abvsStack.get(levelArray + levelObject));
+ this.matched = false;
+ }
+ }
+ }
+ }
+ if (allKeys.size() - 1 >= 0) {
+ allKeys.remove(allKeys.size() - 1);
+ }
+ itemStack.remove(itemStack.size() - 1);
+ levelObject--;
+ break;
+ default:
+ break;
+ }
+ token = parser.nextToken();
+ }
+ sb.finish();
+ } catch (Exception e) {
+ throw new HyracksDataException("Accessing or writing in out of bounds space", e);
}
return items;
}
+ private boolean pathMatch() {
+ outputStream.reset();
+ for (Byte[] bb : allKeys) {
+ outputStream.write(ArrayUtils.toPrimitive(bb), 0, ArrayUtils.toPrimitive(bb).length);
+ }
+ //the path of values created by parsing the file
+ boolean contains = false;
+ this.matched = false;
+ prefixStream.reset();
+ if (pathStream.size() < outputStream.size()) {
+ prefixStream.write(outputStream.toByteArray(), 0, pathStream.size());
+ contains = Arrays.equals(prefixStream.toByteArray(), pathStream.toByteArray());
+ } else {
+ prefixStream.write(pathStream.toByteArray(), 0, outputStream.size());
+ contains = Arrays.equals(prefixStream.toByteArray(), outputStream.toByteArray());
+ }
+ if (pathStream.size() == outputStream.size() && contains) {
+ this.objectMatchLevel = this.levelObject;
+ this.matched = true;
+ this.literal = false;
+ }
+ return contains;
+ }
+
+ public void itemsInArray() {
+ if (itemStack.get(itemStack.size() - 1) == itemType.ARRAY && !this.arrayCounters.isEmpty()) {
+ boolean addCounter = levelArray - 1 < this.keysOrMembers.size() ? this.keysOrMembers.get(levelArray - 1)
+ : true;
+ if (addCounter) {
+ this.arrayCounters.set(levelArray - 1, this.arrayCounters.get(levelArray - 1) + 1);
+ this.allKeys.add(this.toBytes(this.arrayCounters.get(levelArray - 1)));
+ } else {
+ Byte[] bool = { (byte) 0x2B, 0x01 };
+ this.allKeys.add(bool);
+ }
+ }
+ }
+
public void atomicValues(int tag, JsonParser parser, DataOutput out, StringValueBuilder svb, int levelArray,
int levelObject) throws IOException {
abvsStack.get(0).reset();
@@ -189,12 +389,141 @@ public class JSONParser implements IParser {
} else if (tag == ValueTag.XS_INTEGER_TAG) {
out.writeLong(parser.getLongValue());
}
- if (itemStack.size() != 0) {
+ if (!itemStack.isEmpty()) {
if (itemStack.get(itemStack.size() - 1) == itemType.ARRAY) {
abStack.get(levelArray - 1).addItem(abvsStack.get(0));
+ if (valueSeq != null && this.matched && levelArray == this.arrayMatchLevel) {
+ this.literal = true;
+ this.matched = false;
+ writeElement(abvsStack.get(0));
+ }
} else if (itemStack.get(itemStack.size() - 1) == itemType.OBJECT) {
obStack.get(levelObject - 1).addItem(spStack.get(levelObject - 1), abvsStack.get(0));
+ if (valueSeq != null && this.matched && levelObject == this.objectMatchLevel) {
+ this.literal = true;
+ this.matched = false;
+ writeElement(abvsStack.get(0));
+ }
+ }
+ }
+ }
+
+ public void writeElement(ArrayBackedValueStorage abvs) throws IOException {
+ tempABVS.reset();
+ DataOutput out = tempABVS.getDataOutput();
+ out.write(abvs.getByteArray(), abvs.getStartOffset(), abvs.getLength());
+ FrameUtils.appendFieldToWriter(writer, appender, tempABVS.getByteArray(), tempABVS.getStartOffset(),
+ tempABVS.getLength());
+ }
+
+ public void startArrayOrObjects(int count) {
+ if (valueSeq != null && !this.arrayCounters.isEmpty()) {
+ boolean addCounter = levelArray - count < this.keysOrMembers.size()
+ ? this.keysOrMembers.get(levelArray - count) : true;
+ if (itemStack.get(itemStack.size() - 1) == itemType.ARRAY) {
+ if (addCounter) {
+ this.arrayCounters.set(levelArray - count, this.arrayCounters.get(levelArray - count) + 1);
+ this.allKeys.add(this.toBytes(this.arrayCounters.get(levelArray - count)));
+ } else {
+ XDMConstants.setTrue(bp);
+ this.allKeys.add(ArrayUtils.toObject(bp.getByteArray()));
+ }
+ }
+
+ }
+ if (count == 2 && valueSeq != null) {
+ this.arrayCounters.add(Integer.valueOf(0));
+ }
+ }
+
+ public void startArray() throws HyracksDataException {
+ levelArray++;
+ if (levelArray > abStack.size()) {
+ abStack.add(new ArrayBuilder());
+ }
+ if (levelArray + levelObject > abvsStack.size() - 1) {
+ abvsStack.add(new ArrayBackedValueStorage());
+ }
+ startArrayOrObjects(2);
+ itemStack.add(itemType.ARRAY);
+ if (this.pathMatch() || this.valueSeq == null) {
+ abvsStack.get(levelArray + levelObject).reset();
+ try {
+ abStack.get(levelArray - 1).reset(abvsStack.get(levelArray + levelObject));
+ } catch (Exception e) {
+ throw new HyracksDataException("Accessing index out of bounds", e);
+ }
+ }
+ }
+
+ public void startObject() throws HyracksDataException {
+ levelObject++;
+ if (levelObject > obStack.size()) {
+ obStack.add(new ObjectBuilder());
+ }
+ if (levelArray + levelObject > abvsStack.size() - 1) {
+ abvsStack.add(new ArrayBackedValueStorage());
+ }
+ startArrayOrObjects(1);
+ itemStack.add(itemType.OBJECT);
+ if (this.pathMatch() || this.valueSeq == null) {
+ abvsStack.get(levelArray + levelObject).reset();
+ try {
+ obStack.get(levelObject - 1).reset(abvsStack.get(levelArray + levelObject));
+ } catch (Exception e) {
+ throw new HyracksDataException("Accessing index out of bounds", e);
}
}
}
+
+ public void startFieldName(JsonParser parser) throws HyracksDataException {
+ if (levelObject > spStack.size()) {
+ keyStack.add(new ArrayBackedValueStorage());
+ spStack.add(new UTF8StringPointable());
+ }
+ keyStack.get(levelObject - 1).reset();
+ DataOutput outk = keyStack.get(levelObject - 1).getDataOutput();
+ try {
+ svb.write(parser.getText(), outk);
+ spStack.get(levelObject - 1).set(keyStack.get(levelObject - 1));
+ if (this.valueSeq != null) {
+ int length = 0;
+ byte[] barr = spStack.get(levelObject - 1).getByteArray();
+ outputStream.reset();
+ outputStream.write(barr, 0, spStack.get(levelObject - 1).getLength());
+ allKeys.add(ArrayUtils.toObject(outputStream.toByteArray()));
+ for (int i = 0; i < allKeys.size() - 1; i++) {
+ tvp.set(ArrayUtils.toPrimitive(allKeys.get(i)), 0, ArrayUtils.toPrimitive(allKeys.get(i)).length);
+ length += ArrayUtils.toPrimitive(allKeys.get(i)).length;
+ }
+ //if the next two bytes represent a boolean (boolean has only two bytes),
+ //it means that query asks for all the keys of the object
+ if (length <= pathStream.size() && (length + 2) <= pathStream.size()) {
+ tvp.set(pathStream.toByteArray(), length, length + 2);
+ if (tvp.getTag() == ValueTag.XS_BOOLEAN_TAG) {
+ abvsStack.get(0).reset();
+ out.write(ValueTag.XS_STRING_TAG);
+ svb.write(parser.getText(), out);
+ writeElement(abvsStack.get(0));
+ }
+ }
+ }
+ } catch (Exception e) {
+ throw new HyracksDataException("Writing in out of bounds space", e);
+ }
+ }
+
+ public void startAtomicValues(int tag, JsonParser parser) throws HyracksDataException {
+ itemsInArray();
+ if (this.pathMatch() || this.valueSeq == null) {
+ try {
+ atomicValues(tag, parser, out, svb, levelArray, levelObject);
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ }
+ if (allKeys.size() - 1 >= 0) {
+ allKeys.remove(allKeys.size() - 1);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/vxquery/blob/53b86c24/vxquery-core/src/main/java/org/apache/vxquery/metadata/AbstractVXQueryDataSource.java
----------------------------------------------------------------------
diff --git a/vxquery-core/src/main/java/org/apache/vxquery/metadata/AbstractVXQueryDataSource.java b/vxquery-core/src/main/java/org/apache/vxquery/metadata/AbstractVXQueryDataSource.java
index df6fb4b..2459944 100644
--- a/vxquery-core/src/main/java/org/apache/vxquery/metadata/AbstractVXQueryDataSource.java
+++ b/vxquery-core/src/main/java/org/apache/vxquery/metadata/AbstractVXQueryDataSource.java
@@ -18,37 +18,102 @@ package org.apache.vxquery.metadata;
import java.util.List;
-import org.apache.hyracks.algebricks.core.algebra.metadata.IDataSource;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
import org.apache.hyracks.algebricks.core.algebra.metadata.IDataSourcePropertiesProvider;
+import org.apache.hyracks.algebricks.core.algebra.properties.FunctionalDependency;
import org.apache.hyracks.algebricks.core.algebra.properties.INodeDomain;
-public abstract class AbstractVXQueryDataSource implements IDataSource<String> {
+public abstract class AbstractVXQueryDataSource implements IVXQueryDataSource {
protected static final String DELIMITER = "\\|";
protected int dataSourceId;
protected String collectionName;
protected String[] collectionPartitions;
- protected String elementPath;
+
protected List<Integer> childSeq;
+ protected List<Byte[]> valueSeq;
protected int totalDataSources;
protected String tag;
- protected String function;
protected Object[] types;
protected IDataSourcePropertiesProvider propProvider;
-
- public abstract String getFunctionCall();
+
+ @Override
+ public INodeDomain getDomain() {
+ return null;
+ }
@Override
public boolean isScanAccessPathALeaf() {
- // TODO Auto-generated method stub
return false;
}
+ public int getTotalDataSources() {
+ return totalDataSources;
+ }
+
+ public void setTotalDataSources(int totalDataSources) {
+ this.totalDataSources = totalDataSources;
+ }
+
+ public int getDataSourceId() {
+ return dataSourceId;
+ }
+
+ public int getPartitionCount() {
+ return collectionPartitions.length;
+ }
+
+ public String getTag() {
+ return this.tag;
+ }
+
+ public void setTag(String tag) {
+ this.tag = tag;
+ }
+
@Override
- public INodeDomain getDomain() {
- // TODO Auto-generated method stub
- return null;
+ public String getId() {
+ return collectionName;
+ }
+
+ @Override
+ public Object[] getSchemaTypes() {
+ return types;
+ }
+
+ @Override
+ public IDataSourcePropertiesProvider getPropertiesProvider() {
+ return propProvider;
+ }
+
+ @Override
+ public void computeFDs(List<LogicalVariable> scanVariables, List<FunctionalDependency> fdList) {
+ }
+
+ public void addChildSeq(int integer) {
+ childSeq.add(integer);
+ }
+
+ public List<Integer> getChildSeq() {
+ return childSeq;
+ }
+
+ public void addValueSeq(Byte[] value) {
+ valueSeq.add(value);
+ }
+
+ public List<Byte[]> getValueSeq() {
+ return valueSeq;
+ }
+
+ public String[] getPartitions() {
+ return collectionPartitions;
+ }
+
+ public void setPartitions(String[] collectionPartitions) {
+ this.collectionPartitions = collectionPartitions;
}
+ abstract public boolean usingIndex();
}
http://git-wip-us.apache.org/repos/asf/vxquery/blob/53b86c24/vxquery-core/src/main/java/org/apache/vxquery/metadata/IVXQueryDataSource.java
----------------------------------------------------------------------
diff --git a/vxquery-core/src/main/java/org/apache/vxquery/metadata/IVXQueryDataSource.java b/vxquery-core/src/main/java/org/apache/vxquery/metadata/IVXQueryDataSource.java
new file mode 100644
index 0000000..8e71339
--- /dev/null
+++ b/vxquery-core/src/main/java/org/apache/vxquery/metadata/IVXQueryDataSource.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.vxquery.metadata;
+
+import java.util.List;
+
+import org.apache.hyracks.algebricks.core.algebra.metadata.IDataSource;
+
+public interface IVXQueryDataSource extends IDataSource<String> {
+ boolean usingIndex();
+
+ void addChildSeq(int integer);
+
+ List<Integer> getChildSeq();
+}
+
http://git-wip-us.apache.org/repos/asf/vxquery/blob/53b86c24/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionDataSource.java
----------------------------------------------------------------------
diff --git a/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionDataSource.java b/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionDataSource.java
index bee7c7b..c5761c5 100644
--- a/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionDataSource.java
+++ b/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionDataSource.java
@@ -21,7 +21,6 @@ import java.util.List;
import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
import org.apache.hyracks.algebricks.core.algebra.metadata.IDataSourcePropertiesProvider;
-import org.apache.hyracks.algebricks.core.algebra.properties.FunctionalDependency;
import org.apache.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty;
import org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
import org.apache.hyracks.algebricks.core.algebra.properties.RandomPartitioningProperty;
@@ -29,95 +28,38 @@ import org.apache.hyracks.algebricks.core.algebra.properties.StructuralPropertie
import org.apache.vxquery.compiler.rewriter.rules.CollectionFileDomain;
public class VXQueryCollectionDataSource extends AbstractVXQueryDataSource {
-
- private VXQueryCollectionDataSource(int id, String file, Object[] types) {
+ private VXQueryCollectionDataSource(int id, String collection, Object[] types) {
this.dataSourceId = id;
- this.collectionName = file;
- collectionPartitions = collectionName.split(DELIMITER);
+ this.collectionName = collection;
+ this.collectionPartitions = collectionName.split(DELIMITER);
this.types = types;
+
final IPhysicalPropertiesVector vec = new StructuralPropertiesVector(
new RandomPartitioningProperty(new CollectionFileDomain(collectionName)),
new ArrayList<ILocalStructuralProperty>());
- propProvider = new IDataSourcePropertiesProvider() {
+ this.propProvider = new IDataSourcePropertiesProvider() {
@Override
public IPhysicalPropertiesVector computePropertiesVector(List<LogicalVariable> scanVariables) {
return vec;
}
};
- this.childSeq = new ArrayList<>();
this.tag = null;
+ this.childSeq = new ArrayList<>();
+ this.valueSeq = new ArrayList<>();
}
- public static VXQueryCollectionDataSource create(int id, String file, Object type) {
- return new VXQueryCollectionDataSource(id, file, new Object[] { type });
- }
-
- public int getTotalDataSources() {
- return totalDataSources;
- }
-
- public void setTotalDataSources(int totalDataSources) {
- this.totalDataSources = totalDataSources;
- }
-
- public int getDataSourceId() {
- return dataSourceId;
- }
-
- public String[] getPartitions() {
- return collectionPartitions;
- }
-
- public void setPartitions(String[] collectionPartitions) {
- this.collectionPartitions = collectionPartitions;
- }
-
- public int getPartitionCount() {
- return collectionPartitions.length;
- }
-
- public String getTag() {
- return this.tag;
- }
-
- public void setTag(String tag) {
- this.tag = tag;
- }
-
- @Override
- public String getId() {
- return collectionName;
- }
-
- @Override
- public Object[] getSchemaTypes() {
- return types;
- }
-
- @Override
- public IDataSourcePropertiesProvider getPropertiesProvider() {
- return propProvider;
- }
-
- @Override
- public void computeFDs(List<LogicalVariable> scanVariables, List<FunctionalDependency> fdList) {
- }
-
- public void addChildSeq(int integer) {
- childSeq.add(integer);
- }
-
- public List<Integer> getChildSeq() {
- return childSeq;
+ public static VXQueryCollectionDataSource create(int id, String collection, Object type) {
+ return new VXQueryCollectionDataSource(id, collection, new Object[] { type });
}
@Override
public String toString() {
- return "VXQueryCollectionDataSource [collectionName=" + collectionName + ", childSeq=" + childSeq + "]";
+ return "VXQueryCollectionDataSource [collectionName=" + collectionName + ", childSeq=" + childSeq
+ + ", valueSeq=" + valueSeq + "]";
}
- @Override
- public String getFunctionCall() {
- return function;
+ public boolean usingIndex() {
+ return false;
}
+
}
http://git-wip-us.apache.org/repos/asf/vxquery/blob/53b86c24/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionOperatorDescriptor.java b/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionOperatorDescriptor.java
index be95f93..623b48c 100644
--- a/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionOperatorDescriptor.java
+++ b/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionOperatorDescriptor.java
@@ -77,6 +77,7 @@ public class VXQueryCollectionOperatorDescriptor extends AbstractSingleActivityO
private short totalDataSources;
private String[] collectionPartitions;
private List<Integer> childSeq;
+ private List<Byte[]> valueSeq;
protected static final Logger LOGGER = Logger.getLogger(VXQueryCollectionOperatorDescriptor.class.getName());
private HDFSFunctions hdfs;
private String tag;
@@ -84,13 +85,14 @@ public class VXQueryCollectionOperatorDescriptor extends AbstractSingleActivityO
private final String hdfsConf;
private final Map<String, NodeControllerInfo> nodeControllerInfos;
- public VXQueryCollectionOperatorDescriptor(IOperatorDescriptorRegistry spec, VXQueryCollectionDataSource ds,
+ public VXQueryCollectionOperatorDescriptor(IOperatorDescriptorRegistry spec, AbstractVXQueryDataSource ds,
RecordDescriptor rDesc, String hdfsConf, Map<String, NodeControllerInfo> nodeControllerInfos) {
super(spec, 1, 1);
collectionPartitions = ds.getPartitions();
dataSourceId = (short) ds.getDataSourceId();
totalDataSources = (short) ds.getTotalDataSources();
childSeq = ds.getChildSeq();
+ valueSeq = ds.getValueSeq();
recordDescriptors[0] = rDesc;
this.tag = ds.getTag();
this.hdfsConf = hdfsConf;
@@ -113,7 +115,7 @@ public class VXQueryCollectionOperatorDescriptor extends AbstractSingleActivityO
final String collectionName = collectionPartitions[partition % collectionPartitions.length];
final XMLParser parser = new XMLParser(false, nodeIdProvider, nodeId, appender, childSeq,
dCtx.getStaticContext());
- final JSONParser jparser = new JSONParser();
+ final JSONParser jparser = new JSONParser(valueSeq);
return new AbstractUnaryInputUnaryOutputOperatorNodePushable() {
@Override
@@ -130,7 +132,7 @@ public class VXQueryCollectionOperatorDescriptor extends AbstractSingleActivityO
Reader input;
if (!collectionModifiedName.contains("hdfs:/")) {
File collectionDirectory = new File(collectionModifiedName);
- //check if directory is in the local file system
+ // check if directory is in the local file system
if (collectionDirectory.exists()) {
// Go through each tuple.
if (collectionDirectory.isDirectory()) {
@@ -152,9 +154,7 @@ public class VXQueryCollectionOperatorDescriptor extends AbstractSingleActivityO
try {
jsonAbvs.reset();
input = new InputStreamReader(new FileInputStream(file));
- jparser.parse(input, jsonAbvs);
- FrameUtils.appendFieldToWriter(writer, appender, jsonAbvs.getByteArray(),
- jsonAbvs.getStartOffset(), jsonAbvs.getLength());
+ jparser.parse(input, jsonAbvs, writer, appender);
} catch (FileNotFoundException e) {
throw new HyracksDataException(e.toString());
}
@@ -197,14 +197,14 @@ public class VXQueryCollectionOperatorDescriptor extends AbstractSingleActivityO
RecordReader reader;
TaskAttemptContext context;
for (int i = 0; i < size; i++) {
- //read split
+ // read split
context = ctxFactory.createContext(job.getConfiguration(), i);
-
reader = inputFormat.createRecordReader(inputSplits.get(i), context);
reader.initialize(inputSplits.get(i), context);
while (reader.nextKeyValue()) {
value = reader.getCurrentValue().toString();
- //Split value if it contains more than one item with the tag
+ // Split value if it contains more than
+ // one item with the tag
if (StringUtils.countMatches(value, tag) > 1) {
String[] items = value.split(tag);
for (String item : items) {
@@ -218,7 +218,9 @@ public class VXQueryCollectionOperatorDescriptor extends AbstractSingleActivityO
}
} else {
value = START_TAG + value;
- //create an input stream to the file currently reading and send it to parser
+ // create an input stream to the
+ // file currently reading and send
+ // it to parser
stream = new ByteArrayInputStream(value.getBytes(StandardCharsets.UTF_8));
parser.parseHDFSElements(stream, writer, fta, i);
stream.close();
@@ -232,10 +234,10 @@ public class VXQueryCollectionOperatorDescriptor extends AbstractSingleActivityO
}
} else {
try {
- //check if the path exists and is a directory
+ // check if the path exists and is a directory
if (fs.exists(directory) && fs.isDirectory(directory)) {
for (int tupleIndex = 0; tupleIndex < fta.getTupleCount(); ++tupleIndex) {
- //read every file in the directory
+ // read every file in the directory
RemoteIterator<LocatedFileStatus> it = fs.listFiles(directory, true);
while (it.hasNext()) {
xmlDocument = it.next().getPath();
@@ -244,7 +246,9 @@ public class VXQueryCollectionOperatorDescriptor extends AbstractSingleActivityO
LOGGER.fine(
"Starting to read XML document: " + xmlDocument.getName());
}
- //create an input stream to the file currently reading and send it to parser
+ // create an input stream to the
+ // file currently reading and
+ // send it to parser
InputStream in = fs.open(xmlDocument).getWrappedStream();
parser.parseHDFSElements(in, writer, fta, tupleIndex);
in.close();
http://git-wip-us.apache.org/repos/asf/vxquery/blob/53b86c24/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryIndexingDataSource.java
----------------------------------------------------------------------
diff --git a/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryIndexingDataSource.java b/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryIndexingDataSource.java
index ddbc984..ea69cfd 100644
--- a/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryIndexingDataSource.java
+++ b/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryIndexingDataSource.java
@@ -31,9 +31,9 @@ import org.apache.vxquery.compiler.rewriter.rules.CollectionFileDomain;
*/
public class VXQueryIndexingDataSource extends AbstractVXQueryDataSource {
- protected Object[] types;
+ private String elementPath;
+ private String function;
- protected IDataSourcePropertiesProvider propProvider;
private VXQueryIndexingDataSource(int id, String collection, String elementPath, Object[] types,
String functionCall) {
this.dataSourceId = id;
@@ -41,12 +41,11 @@ public class VXQueryIndexingDataSource extends AbstractVXQueryDataSource {
this.elementPath = elementPath;
this.function = functionCall;
this.collectionPartitions = collectionName.split(DELIMITER);
-
this.types = types;
+
final IPhysicalPropertiesVector vec = new StructuralPropertiesVector(
- new RandomPartitioningProperty(new CollectionFileDomain(collectionName)),
- new ArrayList<>());
- propProvider = new IDataSourcePropertiesProvider() {
+ new RandomPartitioningProperty(new CollectionFileDomain(collectionName)), new ArrayList<>());
+ this.propProvider = new IDataSourcePropertiesProvider() {
@Override
public IPhysicalPropertiesVector computePropertiesVector(List<LogicalVariable> scanVariables) {
return vec;
@@ -54,83 +53,30 @@ public class VXQueryIndexingDataSource extends AbstractVXQueryDataSource {
};
this.tag = null;
this.childSeq = new ArrayList<>();
+ this.valueSeq = new ArrayList<>();
}
- public static VXQueryIndexingDataSource create(int id, String collection, String index, Object type, String
- function) {
+ public static VXQueryIndexingDataSource create(int id, String collection, String index, Object type,
+ String function) {
return new VXQueryIndexingDataSource(id, collection, index, new Object[] { type }, function);
}
- public int getTotalDataSources() {
- return totalDataSources;
- }
-
- public void setTotalDataSources(int totalDataSources) {
- this.totalDataSources = totalDataSources;
- }
-
- public int getDataSourceId() {
- return dataSourceId;
- }
-
public String getElementPath() {
return elementPath;
}
- public String[] getCollectionPartitions() {
- return collectionPartitions;
- }
-
- public void setCollectionPartitions(String[] collectionPartitions) {
- this.collectionPartitions = collectionPartitions;
- }
-
- public int getPartitionCount() {
- return collectionPartitions.length;
- }
-
- public String getTag() {
- return this.tag;
- }
-
- public void setTag(String tag) {
- this.tag = tag;
- }
-
- @Override
- public String getId() {
- return collectionName;
- }
-
- @Override
- public Object[] getSchemaTypes() {
- return types;
- }
-
- @Override
- public IDataSourcePropertiesProvider getPropertiesProvider() {
- return propProvider;
- }
-
- @Override
- public void computeFDs(List scanVariables, List fdList) {
+ public String getFunctionCall() {
+ return function;
}
@Override
public String toString() {
- return "VXQueryIndexingDataSource [collectionName=" + collectionName + ", elementPath=" + elementPath + " "
- + "function=" + function + "]";
+ return "VXQueryIndexingDataSource [collectionName=" + collectionName + ", elementPath=" + elementPath
+ + ", function=" + function + "]";
}
- @Override
- public String getFunctionCall() {
- return function;
- }
-
- public List<Integer> getChildSeq() {
- return childSeq;
+ public boolean usingIndex() {
+ return true;
}
}
-
-
http://git-wip-us.apache.org/repos/asf/vxquery/blob/53b86c24/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryIndexingOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryIndexingOperatorDescriptor.java b/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryIndexingOperatorDescriptor.java
index a24a629..ac92a0e 100644
--- a/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryIndexingOperatorDescriptor.java
+++ b/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryIndexingOperatorDescriptor.java
@@ -68,7 +68,7 @@ public class VXQueryIndexingOperatorDescriptor extends AbstractSingleActivityOpe
RecordDescriptor rDesc, String hdfsConf, Map<String, NodeControllerInfo> nodeControllerInfos) {
super(spec, 1, 1);
this.functionCall = ds.getFunctionCall();
- collectionPartitions = ds.getCollectionPartitions();
+ collectionPartitions = ds.getPartitions();
dataSourceId = (short) ds.getDataSourceId();
totalDataSources = (short) ds.getTotalDataSources();
recordDescriptors[0] = rDesc;
http://git-wip-us.apache.org/repos/asf/vxquery/blob/53b86c24/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryMetadataProvider.java
----------------------------------------------------------------------
diff --git a/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryMetadataProvider.java b/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryMetadataProvider.java
index e552f68..f6644d6 100644
--- a/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryMetadataProvider.java
+++ b/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryMetadataProvider.java
@@ -88,43 +88,32 @@ public class VXQueryMetadataProvider implements IMetadataProvider<String, String
List<LogicalVariable> scanVariables, List<LogicalVariable> projectVariables, boolean projectPushed,
List<LogicalVariable> minFilterVars, List<LogicalVariable> maxFilterVars, IOperatorSchema opSchema,
IVariableTypeEnvironment typeEnv, JobGenContext context, JobSpecification jobSpec, Object implConfig)
- throws AlgebricksException {
- VXQueryCollectionDataSource ds = null;
- VXQueryIndexingDataSource ids = null;
-
- try {
- ids = (VXQueryIndexingDataSource) dataSource;
- } catch (ClassCastException e) {
- ds = (VXQueryCollectionDataSource) dataSource;
- }
+ throws AlgebricksException {
+ AbstractVXQueryDataSource ds = (AbstractVXQueryDataSource) dataSource;
if (sourceFileMap != null) {
- final int len = ds != null ? ds.getPartitions().length : ids.getCollectionPartitions().length;
+ final int len = ds.getPartitions().length;
String[] collectionPartitions = new String[len];
for (int i = 0; i < len; ++i) {
- String partition = ds != null ? ds.getPartitions()[i] : ids.getCollectionPartitions()[i];
+ String partition = ds.getPartitions()[i];
File mapped = sourceFileMap.get(partition);
collectionPartitions[i] = mapped != null ? mapped.toString() : partition;
}
- if (ds != null) {
- ds.setPartitions(collectionPartitions);
- } else {
- ids.setCollectionPartitions(collectionPartitions);
- }
+ ds.setPartitions(collectionPartitions);
}
RecordDescriptor rDesc;
IOperatorDescriptor scanner;
AlgebricksPartitionConstraint constraint;
- if (ds != null) {
+ if (!ds.usingIndex()) {
rDesc = new RecordDescriptor(new ISerializerDeserializer[opSchema.getSize()]);
scanner = new VXQueryCollectionOperatorDescriptor(jobSpec, ds, rDesc, this.hdfsConf,
this.nodeControllerInfos);
constraint = getClusterLocations(nodeList, ds.getPartitionCount());
} else {
rDesc = new RecordDescriptor(new ISerializerDeserializer[opSchema.getSize()]);
- scanner = new VXQueryIndexingOperatorDescriptor(jobSpec, ids, rDesc, this.hdfsConf,
- this.nodeControllerInfos);
- constraint = getClusterLocations(nodeList, ids.getPartitionCount());
+ scanner = new VXQueryIndexingOperatorDescriptor(jobSpec, (VXQueryIndexingDataSource) ds, rDesc,
+ this.hdfsConf, this.nodeControllerInfos);
+ constraint = getClusterLocations(nodeList, ds.getPartitionCount());
}
return new Pair<>(scanner, constraint);
@@ -245,7 +234,7 @@ public class VXQueryMetadataProvider implements IMetadataProvider<String, String
JobGenContext context, JobSpecification spec, boolean bulkload) throws AlgebricksException {
throw new UnsupportedOperationException();
}
-
+
@Override
public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getInsertRuntime(IDataSource<String> dataSource,
IOperatorSchema propagatedSchema, IVariableTypeEnvironment typeEnv, List<LogicalVariable> keys,
@@ -254,7 +243,7 @@ public class VXQueryMetadataProvider implements IMetadataProvider<String, String
JobSpecification jobSpec, boolean bulkload) throws AlgebricksException {
throw new UnsupportedOperationException();
}
-
+
@Override
public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getUpsertRuntime(IDataSource<String> dataSource,
IOperatorSchema inputSchema, IVariableTypeEnvironment typeEnv, List<LogicalVariable> keys,
@@ -263,7 +252,7 @@ public class VXQueryMetadataProvider implements IMetadataProvider<String, String
JobSpecification jobSpec) throws AlgebricksException {
throw new UnsupportedOperationException();
}
-
+
@Override
public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getIndexUpsertRuntime(
IDataSourceIndex<String, String> dataSourceIndex, IOperatorSchema propagatedSchema,
@@ -274,10 +263,11 @@ public class VXQueryMetadataProvider implements IMetadataProvider<String, String
JobSpecification spec) throws AlgebricksException {
throw new UnsupportedOperationException();
}
-
+
@Override
public Map<String, String> getConfig() {
return new HashMap<>();
}
+
}
http://git-wip-us.apache.org/repos/asf/vxquery/blob/53b86c24/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/base/AbstractTaggedValueArgumentUnnestingEvaluator.java
----------------------------------------------------------------------
diff --git a/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/base/AbstractTaggedValueArgumentUnnestingEvaluator.java b/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/base/AbstractTaggedValueArgumentUnnestingEvaluator.java
index 4117774..a8be359 100644
--- a/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/base/AbstractTaggedValueArgumentUnnestingEvaluator.java
+++ b/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/base/AbstractTaggedValueArgumentUnnestingEvaluator.java
@@ -16,13 +16,17 @@
*/
package org.apache.vxquery.runtime.functions.base;
+import org.apache.vxquery.datamodel.accessors.ArrayBackedValueStoragePool;
+import org.apache.vxquery.datamodel.accessors.PointablePool;
+import org.apache.vxquery.datamodel.accessors.PointablePoolFactory;
+import org.apache.vxquery.datamodel.accessors.TaggedValuePointable;
+import org.apache.vxquery.exceptions.SystemException;
+
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
import org.apache.hyracks.algebricks.runtime.base.IUnnestingEvaluator;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
-import org.apache.vxquery.datamodel.accessors.PointablePool;
-import org.apache.vxquery.datamodel.accessors.PointablePoolFactory;
-import org.apache.vxquery.datamodel.accessors.TaggedValuePointable;
public abstract class AbstractTaggedValueArgumentUnnestingEvaluator implements IUnnestingEvaluator {
private final IScalarEvaluator[] args;
@@ -30,6 +34,7 @@ public abstract class AbstractTaggedValueArgumentUnnestingEvaluator implements I
protected final TaggedValuePointable[] tvps;
protected final PointablePool ppool = PointablePoolFactory.INSTANCE.createPointablePool();
+ protected final ArrayBackedValueStoragePool abvsPool = new ArrayBackedValueStoragePool();
public AbstractTaggedValueArgumentUnnestingEvaluator(IScalarEvaluator[] args) {
this.args = args;
http://git-wip-us.apache.org/repos/asf/vxquery/blob/53b86c24/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/json/KeysOrMembersScalarEvaluator.java
----------------------------------------------------------------------
diff --git a/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/json/KeysOrMembersScalarEvaluator.java b/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/json/KeysOrMembersScalarEvaluator.java
index d255345..b127d2a 100644
--- a/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/json/KeysOrMembersScalarEvaluator.java
+++ b/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/json/KeysOrMembersScalarEvaluator.java
@@ -21,10 +21,7 @@ import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.data.std.api.IPointable;
import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
import org.apache.vxquery.datamodel.accessors.TaggedValuePointable;
-import org.apache.vxquery.datamodel.accessors.jsonitem.ArrayPointable;
-import org.apache.vxquery.datamodel.accessors.jsonitem.ObjectPointable;
import org.apache.vxquery.datamodel.builders.sequence.SequenceBuilder;
-import org.apache.vxquery.datamodel.values.ValueTag;
import org.apache.vxquery.exceptions.ErrorCode;
import org.apache.vxquery.exceptions.SystemException;
import org.apache.vxquery.runtime.functions.base.AbstractTaggedValueArgumentScalarEvaluator;
@@ -33,42 +30,32 @@ import java.io.IOException;
public class KeysOrMembersScalarEvaluator extends AbstractTaggedValueArgumentScalarEvaluator {
protected final IHyracksTaskContext ctx;
- private final ObjectPointable op;
- private final ArrayPointable ap;
+ private ArrayBackedValueStorage abvs;
private final SequenceBuilder sb;
private final TaggedValuePointable tempTvp;
+ private final KeysOrMembersUnnesting keysOrMembers;
public KeysOrMembersScalarEvaluator(IHyracksTaskContext ctx, IScalarEvaluator[] args) {
super(args);
this.ctx = ctx;
- op = (ObjectPointable) ObjectPointable.FACTORY.createPointable();
- ap = (ArrayPointable) ArrayPointable.FACTORY.createPointable();
+ abvs = new ArrayBackedValueStorage();
sb = new SequenceBuilder();
tempTvp = (TaggedValuePointable) TaggedValuePointable.FACTORY.createPointable();
+ keysOrMembers = new KeysOrMembersUnnesting(ctx, ppool);
}
@Override
protected void evaluate(TaggedValuePointable[] args, IPointable result) throws SystemException {
- final TaggedValuePointable tvp = args[0];
- ArrayBackedValueStorage abvs = abvsPool.takeOne();
+ abvs = abvsPool.takeOne();
+ keysOrMembers.init(args);
try {
- switch (tvp.getTag()) {
- case ValueTag.OBJECT_TAG:
- tvp.getValue(op);
- op.getKeys(abvs);
- result.set(abvs);
- break;
- case ValueTag.ARRAY_TAG:
- abvs.reset();
- sb.reset(abvs);
- tvp.getValue(ap);
- ap.appendItems(sb);
- sb.finish();
- result.set(abvs);
- break;
- default:
- throw new SystemException(ErrorCode.FORG0006);
+ abvs.reset();
+ sb.reset(abvs);
+ while (keysOrMembers.step(tempTvp)) {
+ sb.addItem(tempTvp);
}
+ sb.finish();
+ result.set(abvs);
} catch (IOException e) {
throw new SystemException(ErrorCode.SYSE0001, e);
} finally {
http://git-wip-us.apache.org/repos/asf/vxquery/blob/53b86c24/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/json/KeysOrMembersUnnesting.java
----------------------------------------------------------------------
diff --git a/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/json/KeysOrMembersUnnesting.java b/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/json/KeysOrMembersUnnesting.java
new file mode 100644
index 0000000..0e2597c
--- /dev/null
+++ b/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/json/KeysOrMembersUnnesting.java
@@ -0,0 +1,92 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.vxquery.runtime.functions.json;
+
+import java.io.IOException;
+
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.data.std.api.IPointable;
+import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
+import org.apache.vxquery.datamodel.accessors.ArrayBackedValueStoragePool;
+import org.apache.vxquery.datamodel.accessors.PointablePool;
+import org.apache.vxquery.datamodel.accessors.SequencePointable;
+import org.apache.vxquery.datamodel.accessors.TaggedValuePointable;
+import org.apache.vxquery.datamodel.accessors.jsonitem.ArrayPointable;
+import org.apache.vxquery.datamodel.accessors.jsonitem.ObjectPointable;
+import org.apache.vxquery.datamodel.values.ValueTag;
+import org.apache.vxquery.exceptions.ErrorCode;
+import org.apache.vxquery.exceptions.SystemException;
+import org.apache.vxquery.runtime.functions.step.AbstractForwardAxisPathStep;
+
+public class KeysOrMembersUnnesting extends AbstractForwardAxisPathStep {
+ private final ArrayPointable ap = (ArrayPointable) ArrayPointable.FACTORY.createPointable();
+ private final SequencePointable sp = (SequencePointable) SequencePointable.FACTORY.createPointable();
+ private final ObjectPointable op = (ObjectPointable) ObjectPointable.FACTORY.createPointable();
+ private final TaggedValuePointable tvp = (TaggedValuePointable) TaggedValuePointable.FACTORY.createPointable();
+ private final TaggedValuePointable tempTvp = (TaggedValuePointable) TaggedValuePointable.FACTORY.createPointable();
+ private TaggedValuePointable arg = (TaggedValuePointable) TaggedValuePointable.FACTORY.createPointable();
+ private final ArrayBackedValueStoragePool abvsPool = new ArrayBackedValueStoragePool();
+ private ArrayBackedValueStorage abvs = new ArrayBackedValueStorage();
+ private int arOrObArgsLength;
+ private int indexArrayArgs;
+
+ public KeysOrMembersUnnesting(IHyracksTaskContext ctx, PointablePool pp) {
+ super(ctx, pp);
+ }
+
+ protected void init(TaggedValuePointable[] args) throws SystemException {
+ abvs = abvsPool.takeOne();
+ indexArrayArgs = 0;
+ arg = args[0];
+ switch (arg.getTag()) {
+ case ValueTag.OBJECT_TAG:
+ arg.getValue(op);
+ arOrObArgsLength = op.getEntryCount();
+ break;
+ case ValueTag.ARRAY_TAG:
+ arg.getValue(ap);
+ arOrObArgsLength = ap.getEntryCount();
+ break;
+ default:
+ throw new SystemException(ErrorCode.FORG0006);
+ }
+ }
+
+ public boolean step(IPointable result) throws SystemException {
+ abvs = abvsPool.takeOne();
+ if (arOrObArgsLength > 0) {
+ while (indexArrayArgs < arOrObArgsLength) {
+ if (arg.getTag() == ValueTag.ARRAY_TAG) {
+ ap.getEntry(indexArrayArgs, tvp);
+ } else {
+ try {
+ op.getKeys(abvs);
+ } catch (IOException e) {
+ throw new SystemException(ErrorCode.SYSE0001, e);
+ }
+ tempTvp.set(abvs);
+ tempTvp.getValue(sp);
+ sp.getEntry(indexArrayArgs, tvp);
+ }
+ result.set(tvp.getByteArray(), tvp.getStartOffset(), tvp.getLength());
+ indexArrayArgs++;
+ return true;
+ }
+ }
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/vxquery/blob/53b86c24/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/json/KeysOrMembersUnnestingEvaluator.java
----------------------------------------------------------------------
diff --git a/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/json/KeysOrMembersUnnestingEvaluator.java b/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/json/KeysOrMembersUnnestingEvaluator.java
new file mode 100644
index 0000000..a4e146c
--- /dev/null
+++ b/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/json/KeysOrMembersUnnestingEvaluator.java
@@ -0,0 +1,44 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.vxquery.runtime.functions.json;
+
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IPointable;
+import org.apache.vxquery.datamodel.accessors.TaggedValuePointable;
+import org.apache.vxquery.runtime.functions.base.AbstractTaggedValueArgumentUnnestingEvaluator;
+
+public class KeysOrMembersUnnestingEvaluator extends AbstractTaggedValueArgumentUnnestingEvaluator {
+ private final KeysOrMembersUnnesting keysOrMembersStep;
+
+ public KeysOrMembersUnnestingEvaluator(IHyracksTaskContext ctx, IScalarEvaluator[] args) {
+ super(args);
+ keysOrMembersStep = new KeysOrMembersUnnesting(ctx, ppool);
+ }
+
+ @Override
+ public boolean step(IPointable result) throws HyracksDataException {
+ return keysOrMembersStep.step(result);
+ }
+
+ @Override
+ protected void init(TaggedValuePointable[] args) throws HyracksDataException {
+ keysOrMembersStep.init(args);
+
+ }
+}