You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@vxquery.apache.org by ch...@apache.org on 2017/05/22 18:11:56 UTC

[2/3] vxquery git commit: VXQUERY-105: Add group-by functionality, Add scalability to JSON parser

http://git-wip-us.apache.org/repos/asf/vxquery/blob/53b86c24/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/PushValueIntoDatascanRule.java
----------------------------------------------------------------------
diff --git a/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/PushValueIntoDatascanRule.java b/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/PushValueIntoDatascanRule.java
new file mode 100644
index 0000000..1d8a55d
--- /dev/null
+++ b/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/PushValueIntoDatascanRule.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.vxquery.compiler.rewriter.rules;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
+import org.apache.vxquery.compiler.rewriter.rules.util.ExpressionToolbox;
+import org.apache.vxquery.functions.BuiltinFunctions;
+import org.apache.vxquery.functions.BuiltinOperators;
+import org.apache.vxquery.metadata.IVXQueryDataSource;
+import org.apache.vxquery.metadata.VXQueryCollectionDataSource;
+
+/**
+ * The rule searches for an assign operator immediately following a data scan
+ * operator.
+ *
+ * <pre>
+ * Before
+ *
+ *   plan__parent
+ *   ASSIGN( $v2 : value( $v1, constant) )
+ *   DATASCAN( $source : $v1 )
+ *   plan__child
+ *
+ *   Where $v1 is not used in plan__parent.
+ *
+ * After
+ *
+ *   plan__parent
+ *   ASSIGN( $v2 : $v1 )
+ *   DATASCAN( $source : $v1 )
+ *   plan__child
+ *
+ *   $source is encoded with the value parameters.
+ * </pre>
+ */
+
+public class PushValueIntoDatascanRule extends AbstractPushExpressionIntoDatascanRule {
+
+    @Override
+    boolean updateDataSource(IVXQueryDataSource datasource, Mutable<ILogicalExpression> expression) {
+        VXQueryCollectionDataSource ds = (VXQueryCollectionDataSource) datasource;
+        boolean added = false;
+        List<Mutable<ILogicalExpression>> finds = new ArrayList<Mutable<ILogicalExpression>>();
+        ILogicalExpression le = expression.getValue();
+        if (le.getExpressionTag() == LogicalExpressionTag.FUNCTION_CALL) {
+            AbstractFunctionCallExpression afce = (AbstractFunctionCallExpression) le;
+            if (afce.getFunctionIdentifier().equals(BuiltinFunctions.FN_ZERO_OR_ONE_1.getFunctionIdentifier())) {
+                return false;
+            }
+        }
+        ExpressionToolbox.findAllFunctionExpressions(expression, BuiltinOperators.VALUE.getFunctionIdentifier(), finds);
+
+        for (int i = finds.size(); i > 0; --i) {
+            Byte[] value = null;
+            List<ILogicalExpression> values = ExpressionToolbox.getFullArguments(finds.get(i - 1));
+            if (values.size() > 1) {
+                value = ExpressionToolbox.getConstantArgument(finds.get(i - 1), 1);
+                ds.addValueSeq(value);
+                added = true;
+            }
+        }
+
+        return added;
+    }
+
+    @Override
+    LogicalOperatorTag getOperator() {
+        return LogicalOperatorTag.ASSIGN;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/vxquery/blob/53b86c24/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/RemoveUnusedSortDistinctNodesRule.java
----------------------------------------------------------------------
diff --git a/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/RemoveUnusedSortDistinctNodesRule.java b/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/RemoveUnusedSortDistinctNodesRule.java
index 69940ad..d86de98 100644
--- a/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/RemoveUnusedSortDistinctNodesRule.java
+++ b/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/RemoveUnusedSortDistinctNodesRule.java
@@ -41,6 +41,7 @@ import org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceE
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
@@ -94,12 +95,14 @@ import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
 public class RemoveUnusedSortDistinctNodesRule implements IAlgebraicRewriteRule {
 
     @Override
-    public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException {
+    public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
+            throws AlgebricksException {
         return false;
     }
 
     @Override
-    public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context) {
+    public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
+            throws AlgebricksException {
         boolean operatorChanged = false;
         // Do not process empty or nested tuple source.
         AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();
@@ -151,7 +154,9 @@ public class RemoveUnusedSortDistinctNodesRule implements IAlgebraicRewriteRule
                 }
             }
         }
-
+        if (operatorChanged) {
+            context.computeAndSetTypeEnvironmentForOperator(op);
+        }
         // Now with the new operator, update the variable mappings.
         cardinalityVariable = CardinalityRuleToolbox.updateCardinalityVariable(op, cardinalityVariable, vxqueryContext);
         updateVariableMap(op, cardinalityVariable, documentOrderVariables, uniqueNodesVariables, vxqueryContext);
@@ -178,8 +183,8 @@ public class RemoveUnusedSortDistinctNodesRule implements IAlgebraicRewriteRule
             return 0;
         }
         AbstractFunctionCallExpression functionCall = (AbstractFunctionCallExpression) logicalExpression;
-        if (!functionCall.getFunctionIdentifier().equals(
-                BuiltinOperators.SORT_DISTINCT_NODES_ASC_OR_ATOMICS.getFunctionIdentifier())) {
+        if (!functionCall.getFunctionIdentifier()
+                .equals(BuiltinOperators.SORT_DISTINCT_NODES_ASC_OR_ATOMICS.getFunctionIdentifier())) {
             return 0;
         }
 
@@ -314,7 +319,8 @@ public class RemoveUnusedSortDistinctNodesRule implements IAlgebraicRewriteRule
      * @param uniqueNodesVariables
      * @param uniqueNodes
      */
-    private void resetUniqueNodesVariables(HashMap<Integer, UniqueNodes> uniqueNodesVariables, UniqueNodes uniqueNodes) {
+    private void resetUniqueNodesVariables(HashMap<Integer, UniqueNodes> uniqueNodesVariables,
+            UniqueNodes uniqueNodes) {
         for (Entry<Integer, UniqueNodes> entry : uniqueNodesVariables.entrySet()) {
             uniqueNodesVariables.put(entry.getKey(), uniqueNodes);
         }
@@ -349,8 +355,8 @@ public class RemoveUnusedSortDistinctNodesRule implements IAlgebraicRewriteRule
             case ASSIGN:
                 AssignOperator assign = (AssignOperator) op;
                 for (int index = 0; index < assign.getExpressions().size(); index++) {
-                    ILogicalExpression assignLogicalExpression = (ILogicalExpression) assign.getExpressions()
-                            .get(index).getValue();
+                    ILogicalExpression assignLogicalExpression = (ILogicalExpression) assign.getExpressions().get(index)
+                            .getValue();
                     variableId = assign.getVariables().get(index).getId();
                     documentOrder = propagateDocumentOrder(assignLogicalExpression, documentOrderVariablesForOperator);
                     uniqueNodes = propagateUniqueNodes(assignLogicalExpression, uniqueNodesVariablesForOperator);
@@ -384,8 +390,8 @@ public class RemoveUnusedSortDistinctNodesRule implements IAlgebraicRewriteRule
                 // Find the last operator to set a variable and call this function again.
                 SubplanOperator subplan = (SubplanOperator) op;
                 for (int index = 0; index < subplan.getNestedPlans().size(); index++) {
-                    AbstractLogicalOperator lastOperator = (AbstractLogicalOperator) subplan.getNestedPlans()
-                            .get(index).getRoots().get(0).getValue();
+                    AbstractLogicalOperator lastOperator = (AbstractLogicalOperator) subplan.getNestedPlans().get(index)
+                            .getRoots().get(0).getValue();
                     updateVariableMap(lastOperator, cardinalityVariable, documentOrderVariables, uniqueNodesVariables,
                             vxqueryContext);
                 }
@@ -395,8 +401,8 @@ public class RemoveUnusedSortDistinctNodesRule implements IAlgebraicRewriteRule
                 UnnestOperator unnest = (UnnestOperator) op;
                 ILogicalExpression unnestLogicalExpression = (ILogicalExpression) unnest.getExpressionRef().getValue();
                 variableId = unnest.getVariables().get(0).getId();
-                Cardinality inputCardinality = vxqueryContext.getCardinalityOperatorMap(op.getInputs().get(0)
-                        .getValue());
+                Cardinality inputCardinality = vxqueryContext
+                        .getCardinalityOperatorMap(op.getInputs().get(0).getValue());
                 documentOrder = propagateDocumentOrder(unnestLogicalExpression, documentOrderVariablesForOperator);
                 uniqueNodes = propagateUniqueNodes(unnestLogicalExpression, uniqueNodesVariablesForOperator);
 
@@ -425,10 +431,12 @@ public class RemoveUnusedSortDistinctNodesRule implements IAlgebraicRewriteRule
                 break;
 
             // The following operators do not change or add to the variable map.
+
             case DATASOURCESCAN:
             case DISTRIBUTE_RESULT:
             case EMPTYTUPLESOURCE:
             case EXCHANGE:
+            case GROUP:
             case NESTEDTUPLESOURCE:
             case PROJECT:
             case SELECT:
@@ -438,8 +446,8 @@ public class RemoveUnusedSortDistinctNodesRule implements IAlgebraicRewriteRule
 
             // The following operators' analysis has not yet been implemented.
             default:
-                throw new RuntimeException("Operator (" + op.getOperatorTag()
-                        + ") has not been implemented in rewrite rule.");
+                throw new RuntimeException(
+                        "Operator (" + op.getOperatorTag() + ") has not been implemented in rewrite rule.");
         }
     }
 

http://git-wip-us.apache.org/repos/asf/vxquery/blob/53b86c24/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/SetVariableIdContextRule.java
----------------------------------------------------------------------
diff --git a/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/SetVariableIdContextRule.java b/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/SetVariableIdContextRule.java
index 82be94c..3fb2696 100644
--- a/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/SetVariableIdContextRule.java
+++ b/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/SetVariableIdContextRule.java
@@ -28,11 +28,13 @@ import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
 
 /**
  * Set the default context for the variable id in the optimization context.
+ * 
  * @author prestonc
  */
 public class SetVariableIdContextRule implements IAlgebraicRewriteRule {
     @Override
-    public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException {
+    public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
+            throws AlgebricksException {
         if (context.checkIfInDontApplySet(this, opRef.getValue())) {
             return false;
         }
@@ -44,7 +46,9 @@ public class SetVariableIdContextRule implements IAlgebraicRewriteRule {
             case ASSIGN:
             case AGGREGATE:
                 AbstractAssignOperator assign = (AbstractAssignOperator) op;
-                variableId = assign.getVariables().get(0).getId();
+                if (assign.getVariables().size() > 0) {
+                    variableId = assign.getVariables().get(0).getId();
+                }
                 break;
             case UNNEST:
                 UnnestOperator unnest = (UnnestOperator) op;
@@ -62,7 +66,8 @@ public class SetVariableIdContextRule implements IAlgebraicRewriteRule {
     }
 
     @Override
-    public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException {
+    public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
+            throws AlgebricksException {
         return false;
     }
 }

http://git-wip-us.apache.org/repos/asf/vxquery/blob/53b86c24/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/util/ExpressionToolbox.java
----------------------------------------------------------------------
diff --git a/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/util/ExpressionToolbox.java b/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/util/ExpressionToolbox.java
index 0dd3b31..50bc07e 100644
--- a/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/util/ExpressionToolbox.java
+++ b/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/util/ExpressionToolbox.java
@@ -16,8 +16,10 @@
  */
 package org.apache.vxquery.compiler.rewriter.rules.util;
 
+import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.commons.lang3.ArrayUtils;
 import org.apache.commons.lang3.mutable.Mutable;
 import org.apache.vxquery.compiler.algebricks.VXQueryConstantValue;
 import org.apache.vxquery.context.StaticContext;
@@ -211,6 +213,27 @@ public class ExpressionToolbox {
         return pTypeCode.getInteger();
     }
 
+    public static Byte[] getConstantArgument(Mutable<ILogicalExpression> searchM, int arg) {
+        AbstractFunctionCallExpression searchFunction = (AbstractFunctionCallExpression) searchM.getValue();
+        ILogicalExpression argType = searchFunction.getArguments().get(arg).getValue();
+        searchFunction.getArguments().size();
+        if (argType.getExpressionTag() != LogicalExpressionTag.CONSTANT) {
+            return null;
+        }
+        TaggedValuePointable tvp = (TaggedValuePointable) TaggedValuePointable.FACTORY.createPointable();
+        ExpressionToolbox.getConstantAsPointable((ConstantExpression) argType, tvp);
+        return ArrayUtils.toObject(tvp.getByteArray());
+    }
+
+    public static List<ILogicalExpression> getFullArguments(Mutable<ILogicalExpression> searchM) {
+        AbstractFunctionCallExpression searchFunction = (AbstractFunctionCallExpression) searchM.getValue();
+        ArrayList<ILogicalExpression> args = new ArrayList<ILogicalExpression>();
+        for (int i = 0; i < searchFunction.getArguments().size(); i++) {
+            args.add(searchFunction.getArguments().get(i).getValue());
+        }
+        return args;
+    }
+
     public static SequenceType getTypeExpressionTypeArgument(Mutable<ILogicalExpression> searchM, StaticContext dCtx) {
         int typeId = getTypeExpressionTypeArgument(searchM);
         if (typeId > 0) {

http://git-wip-us.apache.org/repos/asf/vxquery/blob/53b86c24/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/util/OperatorToolbox.java
----------------------------------------------------------------------
diff --git a/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/util/OperatorToolbox.java b/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/util/OperatorToolbox.java
index 2c57c32..94040d2 100644
--- a/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/util/OperatorToolbox.java
+++ b/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/util/OperatorToolbox.java
@@ -20,7 +20,7 @@ import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.commons.lang3.mutable.Mutable;
-
+import org.apache.commons.lang3.mutable.MutableObject;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;

http://git-wip-us.apache.org/repos/asf/vxquery/blob/53b86c24/vxquery-core/src/main/java/org/apache/vxquery/exceptions/SystemException.java
----------------------------------------------------------------------
diff --git a/vxquery-core/src/main/java/org/apache/vxquery/exceptions/SystemException.java b/vxquery-core/src/main/java/org/apache/vxquery/exceptions/SystemException.java
index a3f7bf8..886229d 100644
--- a/vxquery-core/src/main/java/org/apache/vxquery/exceptions/SystemException.java
+++ b/vxquery-core/src/main/java/org/apache/vxquery/exceptions/SystemException.java
@@ -51,6 +51,11 @@ public class SystemException extends HyracksDataException {
         super(message(code, loc));
         this.code = code;
     }
+    
+    public SystemException(ErrorCode code, SourceLocation loc, Throwable cause) {
+        super(message(code, loc), cause);
+        this.code = code;
+    }
 
     public ErrorCode getCode() {
         return code;

http://git-wip-us.apache.org/repos/asf/vxquery/blob/53b86c24/vxquery-core/src/main/java/org/apache/vxquery/functions/builtin-operators.xml
----------------------------------------------------------------------
diff --git a/vxquery-core/src/main/java/org/apache/vxquery/functions/builtin-operators.xml b/vxquery-core/src/main/java/org/apache/vxquery/functions/builtin-operators.xml
index 0b03c34..ec27864 100644
--- a/vxquery-core/src/main/java/org/apache/vxquery/functions/builtin-operators.xml
+++ b/vxquery-core/src/main/java/org/apache/vxquery/functions/builtin-operators.xml
@@ -660,6 +660,7 @@
         <param name="expr" type="json-item()"/>
         <return type="xs:string*"/>
         <runtime type="scalar" class="org.apache.vxquery.runtime.functions.json.KeysOrMembersScalarEvaluatorFactory"/>
+        <runtime type="unnesting" class="org.apache.vxquery.runtime.functions.json.KeysOrMembersUnnestingEvaluatorFactory"/>
     </operator>
 
     <!-- opext:subtract($arg1 as xs:anyAtomicType?, $arg2 as xs:anyAtomicType?) as xs:anyAtomicType? -->

http://git-wip-us.apache.org/repos/asf/vxquery/blob/53b86c24/vxquery-core/src/main/java/org/apache/vxquery/jsonparser/JSONParser.java
----------------------------------------------------------------------
diff --git a/vxquery-core/src/main/java/org/apache/vxquery/jsonparser/JSONParser.java b/vxquery-core/src/main/java/org/apache/vxquery/jsonparser/JSONParser.java
index ad8db4e..edf8dbc 100644
--- a/vxquery-core/src/main/java/org/apache/vxquery/jsonparser/JSONParser.java
+++ b/vxquery-core/src/main/java/org/apache/vxquery/jsonparser/JSONParser.java
@@ -1,43 +1,54 @@
 /*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+* http://www.apache.org/licenses/LICENSE-2.0
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
 package org.apache.vxquery.jsonparser;
 
+import java.io.ByteArrayOutputStream;
 import java.io.DataOutput;
 import java.io.IOException;
 import java.io.Reader;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.htrace.fasterxml.jackson.core.JsonFactory;
+import org.apache.htrace.fasterxml.jackson.core.JsonParser;
+import org.apache.htrace.fasterxml.jackson.core.JsonToken;
+import org.apache.hyracks.api.comm.IFrameFieldAppender;
+import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.primitive.BooleanPointable;
 import org.apache.hyracks.data.std.primitive.UTF8StringPointable;
 import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
+import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
+import org.apache.vxquery.datamodel.accessors.TaggedValuePointable;
 import org.apache.vxquery.datamodel.builders.atomic.StringValueBuilder;
 import org.apache.vxquery.datamodel.builders.jsonitem.ArrayBuilder;
 import org.apache.vxquery.datamodel.builders.jsonitem.ObjectBuilder;
 import org.apache.vxquery.datamodel.builders.sequence.SequenceBuilder;
 import org.apache.vxquery.datamodel.values.ValueTag;
+import org.apache.vxquery.datamodel.values.XDMConstants;
 import org.apache.vxquery.xmlparser.IParser;
 
-import com.fasterxml.jackson.core.JsonFactory;
-import com.fasterxml.jackson.core.JsonParser;
-import com.fasterxml.jackson.core.JsonToken;
-
 public class JSONParser implements IParser {
-
     final JsonFactory factory;
+    final List<Byte[]> valueSeq;
     protected final ArrayBackedValueStorage atomic;
+    private TaggedValuePointable tvp;
+    private BooleanPointable bp;
     protected final List<ArrayBuilder> abStack;
     protected final List<ObjectBuilder> obStack;
     protected final List<ArrayBackedValueStorage> abvsStack;
@@ -48,6 +59,16 @@ public class JSONParser implements IParser {
     protected final DataOutput out;
     protected itemType checkItem;
     protected int levelArray, levelObject;
+    protected final List<Byte[]> allKeys;
+    protected ByteArrayOutputStream outputStream, prefixStream, pathStream;
+    protected int objectMatchLevel;
+    protected int arrayMatchLevel;
+    protected boolean matched, literal;
+    protected ArrayBackedValueStorage tempABVS;
+    protected List<Integer> arrayCounters;
+    protected List<Boolean> keysOrMembers;
+    protected IFrameWriter writer;
+    protected IFrameFieldAppender appender;
 
     enum itemType {
         ARRAY,
@@ -57,8 +78,14 @@ public class JSONParser implements IParser {
     protected final List<itemType> itemStack;
 
     public JSONParser() {
+        this(null);
+    }
+
+    public JSONParser(List<Byte[]> valueSeq) {
         factory = new JsonFactory();
+        this.valueSeq = valueSeq;
         atomic = new ArrayBackedValueStorage();
+        tvp = new TaggedValuePointable();
         abStack = new ArrayList<ArrayBuilder>();
         obStack = new ArrayList<ObjectBuilder>();
         abvsStack = new ArrayList<ArrayBackedValueStorage>();
@@ -67,9 +94,59 @@ public class JSONParser implements IParser {
         itemStack = new ArrayList<itemType>();
         svb = new StringValueBuilder();
         sb = new SequenceBuilder();
+        bp = new BooleanPointable();
+        allKeys = new ArrayList<Byte[]>();
         abvsStack.add(atomic);
         out = abvsStack.get(abvsStack.size() - 1).getDataOutput();
+        tempABVS = new ArrayBackedValueStorage();
+        this.objectMatchLevel = 1;
+        this.arrayMatchLevel = 0;
+        matched = false;
+        literal = false;
+        arrayCounters = new ArrayList<Integer>();
+        outputStream = new ByteArrayOutputStream();
+        prefixStream = new ByteArrayOutputStream();
+        pathStream = new ByteArrayOutputStream();
+        this.keysOrMembers = new ArrayList<Boolean>();
+        outputStream.reset();
+        pathStream.reset();
+        if (valueSeq != null) {
+            for (int i = 0; i < this.valueSeq.size(); i++) {
+                tvp.set(ArrayUtils.toPrimitive(valueSeq.get(i)), 0, ArrayUtils.toPrimitive(valueSeq.get(i)).length);
+                //access an item of an array
+                if (tvp.getTag() == ValueTag.XS_INTEGER_TAG) {
+                    pathStream.write(tvp.getByteArray(), 0, tvp.getLength());
+                    this.arrayMatchLevel++;
+                    this.keysOrMembers.add(Boolean.valueOf(true));
+                    //access all the items of an array or
+                    //all the keys of an object
+                } else if (tvp.getTag() == ValueTag.XS_BOOLEAN_TAG) {
+                    pathStream.write(tvp.getByteArray(), 0, tvp.getLength());
+                    this.arrayMatchLevel++;
+                    this.keysOrMembers.add(Boolean.valueOf(false));
+                    //access an object 
+                } else {
+                    pathStream.write(tvp.getByteArray(), 1, tvp.getLength() - 1);
+                }
+            }
+        }
+    }
 
+    Byte[] toBytes(Integer v) {
+        Byte[] barr = ArrayUtils.toObject(ByteBuffer.allocate(9).putLong(1, v).array());
+        barr[0] = ValueTag.XS_INTEGER_TAG;
+        return barr;
+    }
+
+    public int parse(Reader input, ArrayBackedValueStorage result, IFrameWriter writer, IFrameFieldAppender appender)
+            throws HyracksDataException {
+        this.writer = writer;
+        this.appender = appender;
+        if (this.valueSeq != null) {
+            return parseElements(input, result);
+        } else {
+            return parse(input, result);
+        }
     }
 
     public int parse(Reader input, ArrayBackedValueStorage result) throws HyracksDataException {
@@ -79,7 +156,6 @@ public class JSONParser implements IParser {
             JsonParser parser = factory.createParser(input);
             JsonToken token = parser.nextToken();
             checkItem = null;
-
             levelArray = 0;
             levelObject = 0;
             sb.reset(result);
@@ -89,47 +165,22 @@ public class JSONParser implements IParser {
                 }
                 switch (token) {
                     case START_ARRAY:
-                        levelArray++;
-                        if (levelArray > abStack.size()) {
-                            abStack.add(new ArrayBuilder());
-                        }
-                        if (levelArray + levelObject > abvsStack.size() - 1) {
-                            abvsStack.add(new ArrayBackedValueStorage());
-                        }
-                        itemStack.add(itemType.ARRAY);
-                        abvsStack.get(levelArray + levelObject).reset();
-                        abStack.get(levelArray - 1).reset(abvsStack.get(levelArray + levelObject));
+                        startArray();
                         break;
                     case START_OBJECT:
-                        levelObject++;
-                        if (levelObject > obStack.size()) {
-                            obStack.add(new ObjectBuilder());
-                        }
-                        if (levelArray + levelObject > abvsStack.size() - 1) {
-                            abvsStack.add(new ArrayBackedValueStorage());
-                        }
-                        itemStack.add(itemType.OBJECT);
-                        abvsStack.get(levelArray + levelObject).reset();
-                        obStack.get(levelObject - 1).reset(abvsStack.get(levelArray + levelObject));
+                        startObject();
                         break;
                     case FIELD_NAME:
-                        if (levelObject > spStack.size()) {
-                            keyStack.add(new ArrayBackedValueStorage());
-                            spStack.add(new UTF8StringPointable());
-                        }
-                        keyStack.get(levelObject - 1).reset();
-                        DataOutput outk = keyStack.get(levelObject - 1).getDataOutput();
-                        svb.write(parser.getText(), outk);
-                        spStack.get(levelObject - 1).set(keyStack.get(levelObject - 1));
+                        startFieldName(parser);
                         break;
                     case VALUE_NUMBER_INT:
-                        atomicValues(ValueTag.XS_INTEGER_TAG, parser, out, svb, levelArray, levelObject);
+                        startAtomicValues(ValueTag.XS_INTEGER_TAG, parser);
                         break;
                     case VALUE_STRING:
-                        atomicValues(ValueTag.XS_STRING_TAG, parser, out, svb, levelArray, levelObject);
+                        startAtomicValues(ValueTag.XS_STRING_TAG, parser);
                         break;
                     case VALUE_NUMBER_FLOAT:
-                        atomicValues(ValueTag.XS_DOUBLE_TAG, parser, out, svb, levelArray, levelObject);
+                        startAtomicValues(ValueTag.XS_DOUBLE_TAG, parser);
                         break;
                     case END_ARRAY:
                         abStack.get(levelArray - 1).finish();
@@ -173,11 +224,160 @@ public class JSONParser implements IParser {
             sb.finish();
             outResult.write(result.getByteArray());
         } catch (Exception e) {
-            throw new HyracksDataException(e.toString());
+            throw new HyracksDataException("Accessing or writing in out of bounds space", e);
+        }
+        return items;
+    }
+
+    public int parseElements(Reader input, ArrayBackedValueStorage result) throws HyracksDataException {
+        int items = 0;
+        try {
+            JsonParser parser = factory.createParser(input);
+            JsonToken token = parser.nextToken();
+            checkItem = null;
+
+            this.objectMatchLevel = 0;
+            this.matched = false;
+
+            levelArray = 0;
+            levelObject = 0;
+            sb.reset(result);
+            while (token != null) {
+                if (itemStack.size() > 1) {
+                    checkItem = itemStack.get(itemStack.size() - 2);
+                }
+                switch (token) {
+                    case START_ARRAY:
+                        startArray();
+                        break;
+                    case START_OBJECT:
+                        startObject();
+                        break;
+                    case FIELD_NAME:
+                        startFieldName(parser);
+                        break;
+                    case VALUE_NUMBER_INT:
+                        startAtomicValues(ValueTag.XS_INTEGER_TAG, parser);
+                        break;
+                    case VALUE_STRING:
+                        startAtomicValues(ValueTag.XS_STRING_TAG, parser);
+                        break;
+                    case VALUE_NUMBER_FLOAT:
+                        startAtomicValues(ValueTag.XS_DOUBLE_TAG, parser);
+                        break;
+                    case END_ARRAY:
+                        //if the query doesn't ask for an atomic value
+                        if (!this.literal && this.pathMatch()) {
+                            //check if the path asked from the query includes the current path 
+                            abStack.get(levelArray - 1).finish();
+                            if (itemStack.size() > 1) {
+                                if (checkItem == itemType.ARRAY) {
+                                    if (levelArray > this.arrayMatchLevel + 1) {
+                                        abStack.get(levelArray - 2).addItem(abvsStack.get(levelArray + levelObject));
+                                    } else if (this.matched) {
+                                        this.matched = false;
+                                        items++;
+                                        writeElement(abvsStack.get(levelArray + levelObject));
+                                    }
+                                } else if (checkItem == itemType.OBJECT) {
+                                    if (levelArray > this.arrayMatchLevel && !this.matched) {
+                                        obStack.get(levelObject - 1).addItem(spStack.get(levelObject - 1),
+                                                abvsStack.get(levelArray + levelObject));
+                                    } else if (this.matched) {
+                                        writeElement(abvsStack.get(levelArray + levelObject));
+                                        this.matched = false;
+                                        items++;
+                                    }
+                                }
+                            }
+                        }
+                        if (allKeys.size() - 1 >= 0) {
+                            allKeys.remove(allKeys.size() - 1);
+                        }
+                        this.arrayCounters.remove(levelArray - 1);
+                        itemStack.remove(itemStack.size() - 1);
+                        levelArray--;
+                        break;
+                    case END_OBJECT:
+                        //if the query doesn't ask for an atomic value
+                        if (!this.literal && this.pathMatch()) {
+                            //check if the path asked from the query includes the current path 
+                            obStack.get(levelObject - 1).finish();
+                            if (itemStack.size() > 1) {
+                                if (checkItem == itemType.OBJECT) {
+                                    if (levelObject > this.objectMatchLevel) {
+                                        obStack.get(levelObject - 2).addItem(spStack.get(levelObject - 2),
+                                                abvsStack.get(levelArray + levelObject));
+                                    } else if (this.matched) {
+                                        this.matched = false;
+                                        items++;
+                                        writeElement(abvsStack.get(levelArray + levelObject));
+                                    }
+                                } else if (checkItem == itemType.ARRAY) {
+                                    abStack.get(levelArray - 1).addItem(abvsStack.get(levelArray + levelObject));
+                                    if (this.matched) {
+                                        writeElement(abvsStack.get(levelArray + levelObject));
+                                        this.matched = false;
+                                    }
+                                }
+                            }
+                        }
+                        if (allKeys.size() - 1 >= 0) {
+                            allKeys.remove(allKeys.size() - 1);
+                        }
+                        itemStack.remove(itemStack.size() - 1);
+                        levelObject--;
+                        break;
+                    default:
+                        break;
+                }
+                token = parser.nextToken();
+            }
+            sb.finish();
+        } catch (Exception e) {
+            throw new HyracksDataException("Accessing or writing in out of bounds space", e);
         }
         return items;
     }
 
+    private boolean pathMatch() {
+        outputStream.reset();
+        for (Byte[] bb : allKeys) {
+            outputStream.write(ArrayUtils.toPrimitive(bb), 0, ArrayUtils.toPrimitive(bb).length);
+        }
+        //the path of values created by parsing the file 
+        boolean contains = false;
+        this.matched = false;
+        prefixStream.reset();
+        if (pathStream.size() < outputStream.size()) {
+            prefixStream.write(outputStream.toByteArray(), 0, pathStream.size());
+            contains = Arrays.equals(prefixStream.toByteArray(), pathStream.toByteArray());
+        } else {
+            prefixStream.write(pathStream.toByteArray(), 0, outputStream.size());
+            contains = Arrays.equals(prefixStream.toByteArray(), outputStream.toByteArray());
+        }
+        if (pathStream.size() == outputStream.size() && contains) {
+            this.objectMatchLevel = this.levelObject;
+            this.matched = true;
+            this.literal = false;
+        }
+        return contains;
+    }
+
+    public void itemsInArray() {
+        if (itemStack.get(itemStack.size() - 1) == itemType.ARRAY && !this.arrayCounters.isEmpty()) {
+            boolean addCounter = levelArray - 1 < this.keysOrMembers.size() ? this.keysOrMembers.get(levelArray - 1)
+                    : true;
+            if (addCounter) {
+                this.arrayCounters.set(levelArray - 1, this.arrayCounters.get(levelArray - 1) + 1);
+                this.allKeys.add(this.toBytes(this.arrayCounters.get(levelArray - 1)));
+            } else {
+                Byte[] bool = { (byte) 0x2B, 0x01 };
+                this.allKeys.add(bool);
+            }
+        }
+    }
+
     public void atomicValues(int tag, JsonParser parser, DataOutput out, StringValueBuilder svb, int levelArray,
             int levelObject) throws IOException {
         abvsStack.get(0).reset();
@@ -189,12 +389,141 @@ public class JSONParser implements IParser {
         } else if (tag == ValueTag.XS_INTEGER_TAG) {
             out.writeLong(parser.getLongValue());
         }
-        if (itemStack.size() != 0) {
+        if (!itemStack.isEmpty()) {
             if (itemStack.get(itemStack.size() - 1) == itemType.ARRAY) {
                 abStack.get(levelArray - 1).addItem(abvsStack.get(0));
+                if (valueSeq != null && this.matched && levelArray == this.arrayMatchLevel) {
+                    this.literal = true;
+                    this.matched = false;
+                    writeElement(abvsStack.get(0));
+                }
             } else if (itemStack.get(itemStack.size() - 1) == itemType.OBJECT) {
                 obStack.get(levelObject - 1).addItem(spStack.get(levelObject - 1), abvsStack.get(0));
+                if (valueSeq != null && this.matched && levelObject == this.objectMatchLevel) {
+                    this.literal = true;
+                    this.matched = false;
+                    writeElement(abvsStack.get(0));
+                }
+            }
+        }
+    }
+
+    public void writeElement(ArrayBackedValueStorage abvs) throws IOException {
+        tempABVS.reset();
+        DataOutput out = tempABVS.getDataOutput();
+        out.write(abvs.getByteArray(), abvs.getStartOffset(), abvs.getLength());
+        FrameUtils.appendFieldToWriter(writer, appender, tempABVS.getByteArray(), tempABVS.getStartOffset(),
+                tempABVS.getLength());
+    }
+
+    public void startArrayOrObjects(int count) {
+        if (valueSeq != null && !this.arrayCounters.isEmpty()) {
+            boolean addCounter = levelArray - count < this.keysOrMembers.size()
+                    ? this.keysOrMembers.get(levelArray - count) : true;
+            if (itemStack.get(itemStack.size() - 1) == itemType.ARRAY) {
+                if (addCounter) {
+                    this.arrayCounters.set(levelArray - count, this.arrayCounters.get(levelArray - count) + 1);
+                    this.allKeys.add(this.toBytes(this.arrayCounters.get(levelArray - count)));
+                } else {
+                    XDMConstants.setTrue(bp);
+                    this.allKeys.add(ArrayUtils.toObject(bp.getByteArray()));
+                }
+            }
+
+        }
+        if (count == 2 && valueSeq != null) {
+            this.arrayCounters.add(Integer.valueOf(0));
+        }
+    }
+
+    public void startArray() throws HyracksDataException {
+        levelArray++;
+        if (levelArray > abStack.size()) {
+            abStack.add(new ArrayBuilder());
+        }
+        if (levelArray + levelObject > abvsStack.size() - 1) {
+            abvsStack.add(new ArrayBackedValueStorage());
+        }
+        startArrayOrObjects(2);
+        itemStack.add(itemType.ARRAY);
+        if (this.pathMatch() || this.valueSeq == null) {
+            abvsStack.get(levelArray + levelObject).reset();
+            try {
+                abStack.get(levelArray - 1).reset(abvsStack.get(levelArray + levelObject));
+            } catch (Exception e) {
+                throw new HyracksDataException("Accessing index out of bounds", e);
+            }
+        }
+    }
+
+    public void startObject() throws HyracksDataException {
+        levelObject++;
+        if (levelObject > obStack.size()) {
+            obStack.add(new ObjectBuilder());
+        }
+        if (levelArray + levelObject > abvsStack.size() - 1) {
+            abvsStack.add(new ArrayBackedValueStorage());
+        }
+        startArrayOrObjects(1);
+        itemStack.add(itemType.OBJECT);
+        if (this.pathMatch() || this.valueSeq == null) {
+            abvsStack.get(levelArray + levelObject).reset();
+            try {
+                obStack.get(levelObject - 1).reset(abvsStack.get(levelArray + levelObject));
+            } catch (Exception e) {
+                throw new HyracksDataException("Accessing index out of bounds", e);
             }
         }
     }
+
+    public void startFieldName(JsonParser parser) throws HyracksDataException {
+        if (levelObject > spStack.size()) {
+            keyStack.add(new ArrayBackedValueStorage());
+            spStack.add(new UTF8StringPointable());
+        }
+        keyStack.get(levelObject - 1).reset();
+        DataOutput outk = keyStack.get(levelObject - 1).getDataOutput();
+        try {
+            svb.write(parser.getText(), outk);
+            spStack.get(levelObject - 1).set(keyStack.get(levelObject - 1));
+            if (this.valueSeq != null) {
+                int length = 0;
+                byte[] barr = spStack.get(levelObject - 1).getByteArray();
+                outputStream.reset();
+                outputStream.write(barr, 0, spStack.get(levelObject - 1).getLength());
+                allKeys.add(ArrayUtils.toObject(outputStream.toByteArray()));
+                for (int i = 0; i < allKeys.size() - 1; i++) {
+                    tvp.set(ArrayUtils.toPrimitive(allKeys.get(i)), 0, ArrayUtils.toPrimitive(allKeys.get(i)).length);
+                    length += ArrayUtils.toPrimitive(allKeys.get(i)).length;
+                }
+                //if the next two bytes represent a boolean (boolean has only two bytes), 
+                //it means that query asks for all the keys of the object
+                if (length <= pathStream.size() && (length + 2) <= pathStream.size()) {
+                    tvp.set(pathStream.toByteArray(), length, length + 2);
+                    if (tvp.getTag() == ValueTag.XS_BOOLEAN_TAG) {
+                        abvsStack.get(0).reset();
+                        out.write(ValueTag.XS_STRING_TAG);
+                        svb.write(parser.getText(), out);
+                        writeElement(abvsStack.get(0));
+                    }
+                }
+            }
+        } catch (Exception e) {
+            throw new HyracksDataException("Writing in out of bounds space", e);
+        }
+    }
+
+    public void startAtomicValues(int tag, JsonParser parser) throws HyracksDataException {
+        itemsInArray();
+        if (this.pathMatch() || this.valueSeq == null) {
+            try {
+                atomicValues(tag, parser, out, svb, levelArray, levelObject);
+            } catch (Exception e) {
+                throw new HyracksDataException(e);
+            }
+        }
+        if (allKeys.size() - 1 >= 0) {
+            allKeys.remove(allKeys.size() - 1);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/vxquery/blob/53b86c24/vxquery-core/src/main/java/org/apache/vxquery/metadata/AbstractVXQueryDataSource.java
----------------------------------------------------------------------
diff --git a/vxquery-core/src/main/java/org/apache/vxquery/metadata/AbstractVXQueryDataSource.java b/vxquery-core/src/main/java/org/apache/vxquery/metadata/AbstractVXQueryDataSource.java
index df6fb4b..2459944 100644
--- a/vxquery-core/src/main/java/org/apache/vxquery/metadata/AbstractVXQueryDataSource.java
+++ b/vxquery-core/src/main/java/org/apache/vxquery/metadata/AbstractVXQueryDataSource.java
@@ -18,37 +18,102 @@ package org.apache.vxquery.metadata;
 
 import java.util.List;
 
-import org.apache.hyracks.algebricks.core.algebra.metadata.IDataSource;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
 import org.apache.hyracks.algebricks.core.algebra.metadata.IDataSourcePropertiesProvider;
+import org.apache.hyracks.algebricks.core.algebra.properties.FunctionalDependency;
 import org.apache.hyracks.algebricks.core.algebra.properties.INodeDomain;
 
-public abstract class AbstractVXQueryDataSource implements IDataSource<String> {
+public abstract class AbstractVXQueryDataSource implements IVXQueryDataSource {
     protected static final String DELIMITER = "\\|";
     protected int dataSourceId;
     protected String collectionName;
     protected String[] collectionPartitions;
-    protected String elementPath;
+
     protected List<Integer> childSeq;
+    protected List<Byte[]> valueSeq;
     protected int totalDataSources;
     protected String tag;
-    protected String function;
 
     protected Object[] types;
 
     protected IDataSourcePropertiesProvider propProvider;
-
-    public abstract String getFunctionCall();
+    
+    @Override
+    public INodeDomain getDomain() {
+        return null;
+    }
 
     @Override
     public boolean isScanAccessPathALeaf() {
-        // TODO Auto-generated method stub
         return false;
     }
 
+    public int getTotalDataSources() {
+        return totalDataSources;
+    }
+
+    public void setTotalDataSources(int totalDataSources) {
+        this.totalDataSources = totalDataSources;
+    }
+
+    public int getDataSourceId() {
+        return dataSourceId;
+    }
+
+    public int getPartitionCount() {
+        return collectionPartitions.length;
+    }
+
+    public String getTag() {
+        return this.tag;
+    }
+
+    public void setTag(String tag) {
+        this.tag = tag;
+    }
+
     @Override
-    public INodeDomain getDomain() {
-        // TODO Auto-generated method stub
-        return null;
+    public String getId() {
+        return collectionName;
+    }
+
+    @Override
+    public Object[] getSchemaTypes() {
+        return types;
+    }
+
+    @Override
+    public IDataSourcePropertiesProvider getPropertiesProvider() {
+        return propProvider;
+    }
+
+    @Override
+    public void computeFDs(List<LogicalVariable> scanVariables, List<FunctionalDependency> fdList) {
+    }
+
+    public void addChildSeq(int integer) {
+        childSeq.add(integer);
+    }
+
+    public List<Integer> getChildSeq() {
+        return childSeq;
+    }
+
+    public void addValueSeq(Byte[] value) {
+        valueSeq.add(value);
+    }
+
+    public List<Byte[]> getValueSeq() {
+        return valueSeq;
+    }
+
+    public String[] getPartitions() {
+        return collectionPartitions;
+    }
+
+    public void setPartitions(String[] collectionPartitions) {
+        this.collectionPartitions = collectionPartitions;
     }
 
+    abstract public boolean usingIndex();
 }

http://git-wip-us.apache.org/repos/asf/vxquery/blob/53b86c24/vxquery-core/src/main/java/org/apache/vxquery/metadata/IVXQueryDataSource.java
----------------------------------------------------------------------
diff --git a/vxquery-core/src/main/java/org/apache/vxquery/metadata/IVXQueryDataSource.java b/vxquery-core/src/main/java/org/apache/vxquery/metadata/IVXQueryDataSource.java
new file mode 100644
index 0000000..8e71339
--- /dev/null
+++ b/vxquery-core/src/main/java/org/apache/vxquery/metadata/IVXQueryDataSource.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.vxquery.metadata;
+
+import java.util.List;
+
+import org.apache.hyracks.algebricks.core.algebra.metadata.IDataSource;
+
+public interface IVXQueryDataSource extends IDataSource<String> {
+    boolean usingIndex();
+    
+    void addChildSeq(int integer);
+    
+    List<Integer> getChildSeq();
+}
+

http://git-wip-us.apache.org/repos/asf/vxquery/blob/53b86c24/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionDataSource.java
----------------------------------------------------------------------
diff --git a/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionDataSource.java b/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionDataSource.java
index bee7c7b..c5761c5 100644
--- a/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionDataSource.java
+++ b/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionDataSource.java
@@ -21,7 +21,6 @@ import java.util.List;
 
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
 import org.apache.hyracks.algebricks.core.algebra.metadata.IDataSourcePropertiesProvider;
-import org.apache.hyracks.algebricks.core.algebra.properties.FunctionalDependency;
 import org.apache.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty;
 import org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
 import org.apache.hyracks.algebricks.core.algebra.properties.RandomPartitioningProperty;
@@ -29,95 +28,38 @@ import org.apache.hyracks.algebricks.core.algebra.properties.StructuralPropertie
 import org.apache.vxquery.compiler.rewriter.rules.CollectionFileDomain;
 
 public class VXQueryCollectionDataSource extends AbstractVXQueryDataSource {
-
-    private VXQueryCollectionDataSource(int id, String file, Object[] types) {
+    private VXQueryCollectionDataSource(int id, String collection, Object[] types) {
         this.dataSourceId = id;
-        this.collectionName = file;
-        collectionPartitions = collectionName.split(DELIMITER);
+        this.collectionName = collection;
+        this.collectionPartitions = collectionName.split(DELIMITER);
         this.types = types;
+
         final IPhysicalPropertiesVector vec = new StructuralPropertiesVector(
                 new RandomPartitioningProperty(new CollectionFileDomain(collectionName)),
                 new ArrayList<ILocalStructuralProperty>());
-        propProvider = new IDataSourcePropertiesProvider() {
+        this.propProvider = new IDataSourcePropertiesProvider() {
             @Override
             public IPhysicalPropertiesVector computePropertiesVector(List<LogicalVariable> scanVariables) {
                 return vec;
             }
         };
-        this.childSeq = new ArrayList<>();
         this.tag = null;
+        this.childSeq = new ArrayList<>();
+        this.valueSeq = new ArrayList<>();
     }
 
-    public static VXQueryCollectionDataSource create(int id, String file, Object type) {
-        return new VXQueryCollectionDataSource(id, file, new Object[] { type });
-    }
-
-    public int getTotalDataSources() {
-        return totalDataSources;
-    }
-
-    public void setTotalDataSources(int totalDataSources) {
-        this.totalDataSources = totalDataSources;
-    }
-
-    public int getDataSourceId() {
-        return dataSourceId;
-    }
-
-    public String[] getPartitions() {
-        return collectionPartitions;
-    }
-
-    public void setPartitions(String[] collectionPartitions) {
-        this.collectionPartitions = collectionPartitions;
-    }
-
-    public int getPartitionCount() {
-        return collectionPartitions.length;
-    }
-
-    public String getTag() {
-        return this.tag;
-    }
-
-    public void setTag(String tag) {
-        this.tag = tag;
-    }
-
-    @Override
-    public String getId() {
-        return collectionName;
-    }
-
-    @Override
-    public Object[] getSchemaTypes() {
-        return types;
-    }
-
-    @Override
-    public IDataSourcePropertiesProvider getPropertiesProvider() {
-        return propProvider;
-    }
-
-    @Override
-    public void computeFDs(List<LogicalVariable> scanVariables, List<FunctionalDependency> fdList) {
-    }
-
-    public void addChildSeq(int integer) {
-        childSeq.add(integer);
-    }
-
-    public List<Integer> getChildSeq() {
-        return childSeq;
+    public static VXQueryCollectionDataSource create(int id, String collection, Object type) {
+        return new VXQueryCollectionDataSource(id, collection, new Object[] { type });
     }
 
     @Override
     public String toString() {
-        return "VXQueryCollectionDataSource [collectionName=" + collectionName + ", childSeq=" + childSeq + "]";
+        return "VXQueryCollectionDataSource [collectionName=" + collectionName + ", childSeq=" + childSeq
+                + ", valueSeq=" + valueSeq + "]";
     }
 
-    @Override
-    public String getFunctionCall() {
-        return function;
+    public boolean usingIndex() {
+        return false;
     }
+
 }

http://git-wip-us.apache.org/repos/asf/vxquery/blob/53b86c24/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionOperatorDescriptor.java b/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionOperatorDescriptor.java
index be95f93..623b48c 100644
--- a/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionOperatorDescriptor.java
+++ b/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionOperatorDescriptor.java
@@ -77,6 +77,7 @@ public class VXQueryCollectionOperatorDescriptor extends AbstractSingleActivityO
     private short totalDataSources;
     private String[] collectionPartitions;
     private List<Integer> childSeq;
+    private List<Byte[]> valueSeq;
     protected static final Logger LOGGER = Logger.getLogger(VXQueryCollectionOperatorDescriptor.class.getName());
     private HDFSFunctions hdfs;
     private String tag;
@@ -84,13 +85,14 @@ public class VXQueryCollectionOperatorDescriptor extends AbstractSingleActivityO
     private final String hdfsConf;
     private final Map<String, NodeControllerInfo> nodeControllerInfos;
 
-    public VXQueryCollectionOperatorDescriptor(IOperatorDescriptorRegistry spec, VXQueryCollectionDataSource ds,
+    public VXQueryCollectionOperatorDescriptor(IOperatorDescriptorRegistry spec, AbstractVXQueryDataSource ds,
             RecordDescriptor rDesc, String hdfsConf, Map<String, NodeControllerInfo> nodeControllerInfos) {
         super(spec, 1, 1);
         collectionPartitions = ds.getPartitions();
         dataSourceId = (short) ds.getDataSourceId();
         totalDataSources = (short) ds.getTotalDataSources();
         childSeq = ds.getChildSeq();
+        valueSeq = ds.getValueSeq();
         recordDescriptors[0] = rDesc;
         this.tag = ds.getTag();
         this.hdfsConf = hdfsConf;
@@ -113,7 +115,7 @@ public class VXQueryCollectionOperatorDescriptor extends AbstractSingleActivityO
         final String collectionName = collectionPartitions[partition % collectionPartitions.length];
         final XMLParser parser = new XMLParser(false, nodeIdProvider, nodeId, appender, childSeq,
                 dCtx.getStaticContext());
-        final JSONParser jparser = new JSONParser();
+        final JSONParser jparser = new JSONParser(valueSeq);
 
         return new AbstractUnaryInputUnaryOutputOperatorNodePushable() {
             @Override
@@ -130,7 +132,7 @@ public class VXQueryCollectionOperatorDescriptor extends AbstractSingleActivityO
                 Reader input;
                 if (!collectionModifiedName.contains("hdfs:/")) {
                     File collectionDirectory = new File(collectionModifiedName);
-                    //check if directory is in the local file system
+                    // check if directory is in the local file system
                     if (collectionDirectory.exists()) {
                         // Go through each tuple.
                         if (collectionDirectory.isDirectory()) {
@@ -152,9 +154,7 @@ public class VXQueryCollectionOperatorDescriptor extends AbstractSingleActivityO
                                         try {
                                             jsonAbvs.reset();
                                             input = new InputStreamReader(new FileInputStream(file));
-                                            jparser.parse(input, jsonAbvs);
-                                            FrameUtils.appendFieldToWriter(writer, appender, jsonAbvs.getByteArray(),
-                                                    jsonAbvs.getStartOffset(), jsonAbvs.getLength());
+                                            jparser.parse(input, jsonAbvs, writer, appender);
                                         } catch (FileNotFoundException e) {
                                             throw new HyracksDataException(e.toString());
                                         }
@@ -197,14 +197,14 @@ public class VXQueryCollectionOperatorDescriptor extends AbstractSingleActivityO
                                 RecordReader reader;
                                 TaskAttemptContext context;
                                 for (int i = 0; i < size; i++) {
-                                    //read split
+                                    // read split
                                     context = ctxFactory.createContext(job.getConfiguration(), i);
-
                                     reader = inputFormat.createRecordReader(inputSplits.get(i), context);
                                     reader.initialize(inputSplits.get(i), context);
                                     while (reader.nextKeyValue()) {
                                         value = reader.getCurrentValue().toString();
-                                        //Split value if it contains more than one item with the tag
+                                        // Split value if it contains more than
+                                        // one item with the tag
                                         if (StringUtils.countMatches(value, tag) > 1) {
                                             String[] items = value.split(tag);
                                             for (String item : items) {
@@ -218,7 +218,9 @@ public class VXQueryCollectionOperatorDescriptor extends AbstractSingleActivityO
                                             }
                                         } else {
                                             value = START_TAG + value;
-                                            //create an input stream to the file currently reading and send it to parser
+                                            // create an input stream to the
+                                            // file currently reading and send
+                                            // it to parser
                                             stream = new ByteArrayInputStream(value.getBytes(StandardCharsets.UTF_8));
                                             parser.parseHDFSElements(stream, writer, fta, i);
                                             stream.close();
@@ -232,10 +234,10 @@ public class VXQueryCollectionOperatorDescriptor extends AbstractSingleActivityO
                             }
                         } else {
                             try {
-                                //check if the path exists and is a directory
+                                // check if the path exists and is a directory
                                 if (fs.exists(directory) && fs.isDirectory(directory)) {
                                     for (int tupleIndex = 0; tupleIndex < fta.getTupleCount(); ++tupleIndex) {
-                                        //read every file in the directory
+                                        // read every file in the directory
                                         RemoteIterator<LocatedFileStatus> it = fs.listFiles(directory, true);
                                         while (it.hasNext()) {
                                             xmlDocument = it.next().getPath();
@@ -244,7 +246,9 @@ public class VXQueryCollectionOperatorDescriptor extends AbstractSingleActivityO
                                                     LOGGER.fine(
                                                             "Starting to read XML document: " + xmlDocument.getName());
                                                 }
-                                                //create an input stream to the file currently reading and send it to parser
+                                                // create an input stream to the
+                                                // file currently reading and
+                                                // send it to parser
                                                 InputStream in = fs.open(xmlDocument).getWrappedStream();
                                                 parser.parseHDFSElements(in, writer, fta, tupleIndex);
                                                 in.close();

http://git-wip-us.apache.org/repos/asf/vxquery/blob/53b86c24/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryIndexingDataSource.java
----------------------------------------------------------------------
diff --git a/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryIndexingDataSource.java b/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryIndexingDataSource.java
index ddbc984..ea69cfd 100644
--- a/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryIndexingDataSource.java
+++ b/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryIndexingDataSource.java
@@ -31,9 +31,9 @@ import org.apache.vxquery.compiler.rewriter.rules.CollectionFileDomain;
  */
 public class VXQueryIndexingDataSource extends AbstractVXQueryDataSource {
 
-    protected Object[] types;
+    private String elementPath;
+    private String function;
 
-    protected IDataSourcePropertiesProvider propProvider;
     private VXQueryIndexingDataSource(int id, String collection, String elementPath, Object[] types,
             String functionCall) {
         this.dataSourceId = id;
@@ -41,12 +41,11 @@ public class VXQueryIndexingDataSource extends AbstractVXQueryDataSource {
         this.elementPath = elementPath;
         this.function = functionCall;
         this.collectionPartitions = collectionName.split(DELIMITER);
-
         this.types = types;
+
         final IPhysicalPropertiesVector vec = new StructuralPropertiesVector(
-                new RandomPartitioningProperty(new CollectionFileDomain(collectionName)),
-                new ArrayList<>());
-        propProvider = new IDataSourcePropertiesProvider() {
+                new RandomPartitioningProperty(new CollectionFileDomain(collectionName)), new ArrayList<>());
+        this.propProvider = new IDataSourcePropertiesProvider() {
             @Override
             public IPhysicalPropertiesVector computePropertiesVector(List<LogicalVariable> scanVariables) {
                 return vec;
@@ -54,83 +53,30 @@ public class VXQueryIndexingDataSource extends AbstractVXQueryDataSource {
         };
         this.tag = null;
         this.childSeq = new ArrayList<>();
+        this.valueSeq = new ArrayList<>();
     }
 
-    public static VXQueryIndexingDataSource create(int id, String collection, String index, Object type, String
-            function) {
+    public static VXQueryIndexingDataSource create(int id, String collection, String index, Object type,
+            String function) {
         return new VXQueryIndexingDataSource(id, collection, index, new Object[] { type }, function);
     }
 
-    public int getTotalDataSources() {
-        return totalDataSources;
-    }
-
-    public void setTotalDataSources(int totalDataSources) {
-        this.totalDataSources = totalDataSources;
-    }
-
-    public int getDataSourceId() {
-        return dataSourceId;
-    }
-
     public String getElementPath() {
         return elementPath;
     }
 
-    public String[] getCollectionPartitions() {
-        return collectionPartitions;
-    }
-
-    public void setCollectionPartitions(String[] collectionPartitions) {
-        this.collectionPartitions = collectionPartitions;
-    }
-
-    public int getPartitionCount() {
-        return collectionPartitions.length;
-    }
-
-    public String getTag() {
-        return this.tag;
-    }
-
-    public void setTag(String tag) {
-        this.tag = tag;
-    }
-
-    @Override
-    public String getId() {
-        return collectionName;
-    }
-
-    @Override
-    public Object[] getSchemaTypes() {
-        return types;
-    }
-
-    @Override
-    public IDataSourcePropertiesProvider getPropertiesProvider() {
-        return propProvider;
-    }
-
-    @Override
-    public void computeFDs(List scanVariables, List fdList) {
+    public String getFunctionCall() {
+        return function;
     }
 
     @Override
     public String toString() {
-        return "VXQueryIndexingDataSource [collectionName=" + collectionName + ", elementPath=" + elementPath + " "
-                + "function=" + function + "]";
+        return "VXQueryIndexingDataSource [collectionName=" + collectionName + ", elementPath=" + elementPath
+                + ", function=" + function + "]";
     }
 
-    @Override
-    public String getFunctionCall() {
-        return function;
-    }
-
-    public List<Integer> getChildSeq() {
-        return childSeq;
+    public boolean usingIndex() {
+        return true;
     }
 
 }
-
-

http://git-wip-us.apache.org/repos/asf/vxquery/blob/53b86c24/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryIndexingOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryIndexingOperatorDescriptor.java b/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryIndexingOperatorDescriptor.java
index a24a629..ac92a0e 100644
--- a/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryIndexingOperatorDescriptor.java
+++ b/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryIndexingOperatorDescriptor.java
@@ -68,7 +68,7 @@ public class VXQueryIndexingOperatorDescriptor extends AbstractSingleActivityOpe
             RecordDescriptor rDesc, String hdfsConf, Map<String, NodeControllerInfo> nodeControllerInfos) {
         super(spec, 1, 1);
         this.functionCall = ds.getFunctionCall();
-        collectionPartitions = ds.getCollectionPartitions();
+        collectionPartitions = ds.getPartitions();
         dataSourceId = (short) ds.getDataSourceId();
         totalDataSources = (short) ds.getTotalDataSources();
         recordDescriptors[0] = rDesc;

http://git-wip-us.apache.org/repos/asf/vxquery/blob/53b86c24/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryMetadataProvider.java
----------------------------------------------------------------------
diff --git a/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryMetadataProvider.java b/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryMetadataProvider.java
index e552f68..f6644d6 100644
--- a/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryMetadataProvider.java
+++ b/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryMetadataProvider.java
@@ -88,43 +88,32 @@ public class VXQueryMetadataProvider implements IMetadataProvider<String, String
             List<LogicalVariable> scanVariables, List<LogicalVariable> projectVariables, boolean projectPushed,
             List<LogicalVariable> minFilterVars, List<LogicalVariable> maxFilterVars, IOperatorSchema opSchema,
             IVariableTypeEnvironment typeEnv, JobGenContext context, JobSpecification jobSpec, Object implConfig)
-            throws AlgebricksException {
-        VXQueryCollectionDataSource ds = null;
-        VXQueryIndexingDataSource ids = null;
-
-        try {
-            ids = (VXQueryIndexingDataSource) dataSource;
-        } catch (ClassCastException e) {
-            ds = (VXQueryCollectionDataSource) dataSource;
-        }
+                    throws AlgebricksException {
+        AbstractVXQueryDataSource ds = (AbstractVXQueryDataSource) dataSource;
         if (sourceFileMap != null) {
-            final int len = ds != null ? ds.getPartitions().length : ids.getCollectionPartitions().length;
+            final int len = ds.getPartitions().length;
             String[] collectionPartitions = new String[len];
             for (int i = 0; i < len; ++i) {
-                String partition = ds != null ? ds.getPartitions()[i] : ids.getCollectionPartitions()[i];
+                String partition = ds.getPartitions()[i];
                 File mapped = sourceFileMap.get(partition);
                 collectionPartitions[i] = mapped != null ? mapped.toString() : partition;
             }
-            if (ds != null) {
-                ds.setPartitions(collectionPartitions);
-            } else {
-                ids.setCollectionPartitions(collectionPartitions);
-            }
+            ds.setPartitions(collectionPartitions);
         }
         RecordDescriptor rDesc;
         IOperatorDescriptor scanner;
         AlgebricksPartitionConstraint constraint;
 
-        if (ds != null) {
+        if (!ds.usingIndex()) {
             rDesc = new RecordDescriptor(new ISerializerDeserializer[opSchema.getSize()]);
             scanner = new VXQueryCollectionOperatorDescriptor(jobSpec, ds, rDesc, this.hdfsConf,
                     this.nodeControllerInfos);
             constraint = getClusterLocations(nodeList, ds.getPartitionCount());
         } else {
             rDesc = new RecordDescriptor(new ISerializerDeserializer[opSchema.getSize()]);
-            scanner = new VXQueryIndexingOperatorDescriptor(jobSpec, ids, rDesc, this.hdfsConf,
-                    this.nodeControllerInfos);
-            constraint = getClusterLocations(nodeList, ids.getPartitionCount());
+            scanner = new VXQueryIndexingOperatorDescriptor(jobSpec, (VXQueryIndexingDataSource) ds, rDesc,
+                    this.hdfsConf, this.nodeControllerInfos);
+            constraint = getClusterLocations(nodeList, ds.getPartitionCount());
         }
 
         return new Pair<>(scanner, constraint);
@@ -245,7 +234,7 @@ public class VXQueryMetadataProvider implements IMetadataProvider<String, String
             JobGenContext context, JobSpecification spec, boolean bulkload) throws AlgebricksException {
         throw new UnsupportedOperationException();
     }
-
+    
     @Override
     public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getInsertRuntime(IDataSource<String> dataSource,
             IOperatorSchema propagatedSchema, IVariableTypeEnvironment typeEnv, List<LogicalVariable> keys,
@@ -254,7 +243,7 @@ public class VXQueryMetadataProvider implements IMetadataProvider<String, String
             JobSpecification jobSpec, boolean bulkload) throws AlgebricksException {
         throw new UnsupportedOperationException();
     }
-
+    
     @Override
     public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getUpsertRuntime(IDataSource<String> dataSource,
             IOperatorSchema inputSchema, IVariableTypeEnvironment typeEnv, List<LogicalVariable> keys,
@@ -263,7 +252,7 @@ public class VXQueryMetadataProvider implements IMetadataProvider<String, String
             JobSpecification jobSpec) throws AlgebricksException {
         throw new UnsupportedOperationException();
     }
-
+    
     @Override
     public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getIndexUpsertRuntime(
             IDataSourceIndex<String, String> dataSourceIndex, IOperatorSchema propagatedSchema,
@@ -274,10 +263,11 @@ public class VXQueryMetadataProvider implements IMetadataProvider<String, String
             JobSpecification spec) throws AlgebricksException {
         throw new UnsupportedOperationException();
     }
-
+    
     @Override
     public Map<String, String> getConfig() {
         return new HashMap<>();
     }
 
+
 }

http://git-wip-us.apache.org/repos/asf/vxquery/blob/53b86c24/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/base/AbstractTaggedValueArgumentUnnestingEvaluator.java
----------------------------------------------------------------------
diff --git a/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/base/AbstractTaggedValueArgumentUnnestingEvaluator.java b/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/base/AbstractTaggedValueArgumentUnnestingEvaluator.java
index 4117774..a8be359 100644
--- a/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/base/AbstractTaggedValueArgumentUnnestingEvaluator.java
+++ b/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/base/AbstractTaggedValueArgumentUnnestingEvaluator.java
@@ -16,13 +16,17 @@
  */
 package org.apache.vxquery.runtime.functions.base;
 
+import org.apache.vxquery.datamodel.accessors.ArrayBackedValueStoragePool;
+import org.apache.vxquery.datamodel.accessors.PointablePool;
+import org.apache.vxquery.datamodel.accessors.PointablePoolFactory;
+import org.apache.vxquery.datamodel.accessors.TaggedValuePointable;
+import org.apache.vxquery.exceptions.SystemException;
+
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
 import org.apache.hyracks.algebricks.runtime.base.IUnnestingEvaluator;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
-import org.apache.vxquery.datamodel.accessors.PointablePool;
-import org.apache.vxquery.datamodel.accessors.PointablePoolFactory;
-import org.apache.vxquery.datamodel.accessors.TaggedValuePointable;
 
 public abstract class AbstractTaggedValueArgumentUnnestingEvaluator implements IUnnestingEvaluator {
     private final IScalarEvaluator[] args;
@@ -30,6 +34,7 @@ public abstract class AbstractTaggedValueArgumentUnnestingEvaluator implements I
     protected final TaggedValuePointable[] tvps;
 
     protected final PointablePool ppool = PointablePoolFactory.INSTANCE.createPointablePool();
+    protected final ArrayBackedValueStoragePool abvsPool = new ArrayBackedValueStoragePool();
 
     public AbstractTaggedValueArgumentUnnestingEvaluator(IScalarEvaluator[] args) {
         this.args = args;

http://git-wip-us.apache.org/repos/asf/vxquery/blob/53b86c24/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/json/KeysOrMembersScalarEvaluator.java
----------------------------------------------------------------------
diff --git a/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/json/KeysOrMembersScalarEvaluator.java b/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/json/KeysOrMembersScalarEvaluator.java
index d255345..b127d2a 100644
--- a/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/json/KeysOrMembersScalarEvaluator.java
+++ b/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/json/KeysOrMembersScalarEvaluator.java
@@ -21,10 +21,7 @@ import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.data.std.api.IPointable;
 import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
 import org.apache.vxquery.datamodel.accessors.TaggedValuePointable;
-import org.apache.vxquery.datamodel.accessors.jsonitem.ArrayPointable;
-import org.apache.vxquery.datamodel.accessors.jsonitem.ObjectPointable;
 import org.apache.vxquery.datamodel.builders.sequence.SequenceBuilder;
-import org.apache.vxquery.datamodel.values.ValueTag;
 import org.apache.vxquery.exceptions.ErrorCode;
 import org.apache.vxquery.exceptions.SystemException;
 import org.apache.vxquery.runtime.functions.base.AbstractTaggedValueArgumentScalarEvaluator;
@@ -33,42 +30,32 @@ import java.io.IOException;
 
 public class KeysOrMembersScalarEvaluator extends AbstractTaggedValueArgumentScalarEvaluator {
     protected final IHyracksTaskContext ctx;
-    private final ObjectPointable op;
-    private final ArrayPointable ap;
+    private ArrayBackedValueStorage abvs;
     private final SequenceBuilder sb;
     private final TaggedValuePointable tempTvp;
+    private final KeysOrMembersUnnesting keysOrMembers;
 
     public KeysOrMembersScalarEvaluator(IHyracksTaskContext ctx, IScalarEvaluator[] args) {
         super(args);
         this.ctx = ctx;
-        op = (ObjectPointable) ObjectPointable.FACTORY.createPointable();
-        ap = (ArrayPointable) ArrayPointable.FACTORY.createPointable();
+        abvs = new ArrayBackedValueStorage();
         sb = new SequenceBuilder();
         tempTvp = (TaggedValuePointable) TaggedValuePointable.FACTORY.createPointable();
+        keysOrMembers = new KeysOrMembersUnnesting(ctx, ppool);
     }
 
     @Override
     protected void evaluate(TaggedValuePointable[] args, IPointable result) throws SystemException {
-        final TaggedValuePointable tvp = args[0];
-        ArrayBackedValueStorage abvs = abvsPool.takeOne();
+        abvs = abvsPool.takeOne();
+        keysOrMembers.init(args);
         try {
-            switch (tvp.getTag()) {
-                case ValueTag.OBJECT_TAG:
-                    tvp.getValue(op);
-                    op.getKeys(abvs);
-                    result.set(abvs);
-                    break;
-                case ValueTag.ARRAY_TAG:
-                    abvs.reset();
-                    sb.reset(abvs);
-                    tvp.getValue(ap);
-                    ap.appendItems(sb);
-                    sb.finish();
-                    result.set(abvs);
-                    break;
-                default:
-                    throw new SystemException(ErrorCode.FORG0006);
+            abvs.reset();
+            sb.reset(abvs);
+            while (keysOrMembers.step(tempTvp)) {
+                sb.addItem(tempTvp);
             }
+            sb.finish();
+            result.set(abvs);
         } catch (IOException e) {
             throw new SystemException(ErrorCode.SYSE0001, e);
         } finally {

http://git-wip-us.apache.org/repos/asf/vxquery/blob/53b86c24/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/json/KeysOrMembersUnnesting.java
----------------------------------------------------------------------
diff --git a/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/json/KeysOrMembersUnnesting.java b/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/json/KeysOrMembersUnnesting.java
new file mode 100644
index 0000000..0e2597c
--- /dev/null
+++ b/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/json/KeysOrMembersUnnesting.java
@@ -0,0 +1,92 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.vxquery.runtime.functions.json;
+
+import java.io.IOException;
+
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.data.std.api.IPointable;
+import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
+import org.apache.vxquery.datamodel.accessors.ArrayBackedValueStoragePool;
+import org.apache.vxquery.datamodel.accessors.PointablePool;
+import org.apache.vxquery.datamodel.accessors.SequencePointable;
+import org.apache.vxquery.datamodel.accessors.TaggedValuePointable;
+import org.apache.vxquery.datamodel.accessors.jsonitem.ArrayPointable;
+import org.apache.vxquery.datamodel.accessors.jsonitem.ObjectPointable;
+import org.apache.vxquery.datamodel.values.ValueTag;
+import org.apache.vxquery.exceptions.ErrorCode;
+import org.apache.vxquery.exceptions.SystemException;
+import org.apache.vxquery.runtime.functions.step.AbstractForwardAxisPathStep;
+
+public class KeysOrMembersUnnesting extends AbstractForwardAxisPathStep {
+    private final ArrayPointable ap = (ArrayPointable) ArrayPointable.FACTORY.createPointable();
+    private final SequencePointable sp = (SequencePointable) SequencePointable.FACTORY.createPointable();
+    private final ObjectPointable op = (ObjectPointable) ObjectPointable.FACTORY.createPointable();
+    private final TaggedValuePointable tvp = (TaggedValuePointable) TaggedValuePointable.FACTORY.createPointable();
+    private final TaggedValuePointable tempTvp = (TaggedValuePointable) TaggedValuePointable.FACTORY.createPointable();
+    private TaggedValuePointable arg = (TaggedValuePointable) TaggedValuePointable.FACTORY.createPointable();
+    private final ArrayBackedValueStoragePool abvsPool = new ArrayBackedValueStoragePool();
+    private ArrayBackedValueStorage abvs = new ArrayBackedValueStorage();
+    private int arOrObArgsLength;
+    private int indexArrayArgs;
+
+    public KeysOrMembersUnnesting(IHyracksTaskContext ctx, PointablePool pp) {
+        super(ctx, pp);
+    }
+
+    protected void init(TaggedValuePointable[] args) throws SystemException {
+        abvs = abvsPool.takeOne();
+        indexArrayArgs = 0;
+        arg = args[0];
+        switch (arg.getTag()) {
+            case ValueTag.OBJECT_TAG:
+                arg.getValue(op);
+                arOrObArgsLength = op.getEntryCount();
+                break;
+            case ValueTag.ARRAY_TAG:
+                arg.getValue(ap);
+                arOrObArgsLength = ap.getEntryCount();
+                break;
+            default:
+                throw new SystemException(ErrorCode.FORG0006);
+        }
+    }
+
+    public boolean step(IPointable result) throws SystemException {
+        abvs = abvsPool.takeOne();
+        if (arOrObArgsLength > 0) {
+            while (indexArrayArgs < arOrObArgsLength) {
+                if (arg.getTag() == ValueTag.ARRAY_TAG) {
+                    ap.getEntry(indexArrayArgs, tvp);
+                } else {
+                    try {
+                        op.getKeys(abvs);
+                    } catch (IOException e) {
+                        throw new SystemException(ErrorCode.SYSE0001, e);
+                    }
+                    tempTvp.set(abvs);
+                    tempTvp.getValue(sp);
+                    sp.getEntry(indexArrayArgs, tvp);
+                }
+                result.set(tvp.getByteArray(), tvp.getStartOffset(), tvp.getLength());
+                indexArrayArgs++;
+                return true;
+            }
+        }
+        return false;
+    }
+}

http://git-wip-us.apache.org/repos/asf/vxquery/blob/53b86c24/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/json/KeysOrMembersUnnestingEvaluator.java
----------------------------------------------------------------------
diff --git a/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/json/KeysOrMembersUnnestingEvaluator.java b/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/json/KeysOrMembersUnnestingEvaluator.java
new file mode 100644
index 0000000..a4e146c
--- /dev/null
+++ b/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/json/KeysOrMembersUnnestingEvaluator.java
@@ -0,0 +1,44 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.vxquery.runtime.functions.json;
+
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IPointable;
+import org.apache.vxquery.datamodel.accessors.TaggedValuePointable;
+import org.apache.vxquery.runtime.functions.base.AbstractTaggedValueArgumentUnnestingEvaluator;
+
+public class KeysOrMembersUnnestingEvaluator extends AbstractTaggedValueArgumentUnnestingEvaluator {
+    private final KeysOrMembersUnnesting keysOrMembersStep;
+
+    public KeysOrMembersUnnestingEvaluator(IHyracksTaskContext ctx, IScalarEvaluator[] args) {
+        super(args);
+        keysOrMembersStep = new KeysOrMembersUnnesting(ctx, ppool);
+    }
+
+    @Override
+    public boolean step(IPointable result) throws HyracksDataException {
+        return keysOrMembersStep.step(result);
+    }
+
+    @Override
+    protected void init(TaggedValuePointable[] args) throws HyracksDataException {
+        keysOrMembersStep.init(args);
+
+    }
+}