You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@vxquery.apache.org by ti...@apache.org on 2014/03/14 23:09:32 UTC

[11/25] git commit: This commit holds three ideas: local partitioning, reusing the parser per runtime, and optimizations for child path step. Since all these activities deal with VXQueryCollectionOperatorDescriptor and they are code change I am pulling f

This commit holds three ideas: local partitioning, reusing the parser per runtime, and optimizations for child path step.
Since all these activities deal with VXQueryCollectionOperatorDescriptor and they are code change I am pulling from another branch, I have just kept them in this one commit.


Project: http://git-wip-us.apache.org/repos/asf/incubator-vxquery/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-vxquery/commit/f72feda7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-vxquery/tree/f72feda7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-vxquery/diff/f72feda7

Branch: refs/heads/westmann/prettyprint
Commit: f72feda74fcead83e807c5168a830b91bbedbb26
Parents: d146525
Author: Preston Carman <pr...@apache.org>
Authored: Thu Feb 13 16:03:34 2014 -0800
Committer: Preston Carman <pr...@apache.org>
Committed: Thu Feb 13 16:03:34 2014 -0800

----------------------------------------------------------------------
 .../builders/nodes/DictionaryBuilder.java       |   7 +
 .../vxquery/functions/builtin-functions.xml     |   2 -
 .../metadata/VXQueryCollectionDataSource.java   |  12 ++
 .../VXQueryCollectionOperatorDescriptor.java    | 139 +++++++------
 .../vxquery/metadata/VXQueryIOFileFilter.java   |  37 ++++
 .../metadata/VXQueryMetadataProvider.java       |  32 +--
 ...GeneralComparisonScalarEvaluatorFactory.java |   2 +-
 .../FnCollectionScalarEvaluatorFactory.java     | 117 -----------
 .../node/FnDocScalarEvaluatorFactory.java       |  10 +-
 .../functions/step/AbstractChildPathStep.java   |  74 +++++++
 .../runtime/functions/step/ChildPathStep.java   | 200 -------------------
 .../step/ChildPathStepOperatorDescriptor.java   | 153 ++++++++++++++
 .../step/ChildPathStepScalarEvaluator.java      |  13 +-
 .../functions/step/ChildPathStepUnnesting.java  | 126 ++++++++++++
 .../step/ChildPathStepUnnestingEvaluator.java   |  44 ++++
 .../ChildPathStepUnnestingEvaluatorFactory.java |  18 +-
 .../runtime/functions/util/FunctionHelper.java  | 189 ++----------------
 .../vxquery/xmlparser/SAXContentHandler.java    |   2 +
 .../org/apache/vxquery/xmlparser/XMLParser.java |  27 +++
 19 files changed, 602 insertions(+), 602 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-vxquery/blob/f72feda7/vxquery-core/src/main/java/org/apache/vxquery/datamodel/builders/nodes/DictionaryBuilder.java
----------------------------------------------------------------------
diff --git a/vxquery-core/src/main/java/org/apache/vxquery/datamodel/builders/nodes/DictionaryBuilder.java b/vxquery-core/src/main/java/org/apache/vxquery/datamodel/builders/nodes/DictionaryBuilder.java
index 41cc98c..20b7333 100644
--- a/vxquery-core/src/main/java/org/apache/vxquery/datamodel/builders/nodes/DictionaryBuilder.java
+++ b/vxquery-core/src/main/java/org/apache/vxquery/datamodel/builders/nodes/DictionaryBuilder.java
@@ -78,6 +78,13 @@ public class DictionaryBuilder {
         tempOut = new DataOutputStream(tempStringData);
         tempStringPointable = (UTF8StringPointable) UTF8StringPointable.FACTORY.createPointable();
     }
+    
+    public void reset() {
+        stringEndOffsets.clear();
+        sortedSlotIndexes.clear();
+        dataBuffer.reset();
+        tempStringData.reset();
+    }
 
     public void write(ArrayBackedValueStorage abvs) throws IOException {
         DataOutput out = abvs.getDataOutput();

http://git-wip-us.apache.org/repos/asf/incubator-vxquery/blob/f72feda7/vxquery-core/src/main/java/org/apache/vxquery/functions/builtin-functions.xml
----------------------------------------------------------------------
diff --git a/vxquery-core/src/main/java/org/apache/vxquery/functions/builtin-functions.xml b/vxquery-core/src/main/java/org/apache/vxquery/functions/builtin-functions.xml
index cb1afc2..b439a83 100644
--- a/vxquery-core/src/main/java/org/apache/vxquery/functions/builtin-functions.xml
+++ b/vxquery-core/src/main/java/org/apache/vxquery/functions/builtin-functions.xml
@@ -125,8 +125,6 @@
     <function name="fn:collection">
         <param name="arg" type="xs:string?"/>
         <return type="node()*"/>
-        <!-- All fn:collection query plans are rewritten to use Algebricks' datasource_scan operator. -->
-        <runtime type="scalar" class="org.apache.vxquery.runtime.functions.node.FnCollectionScalarEvaluatorFactory"/>
     </function>
     
     <!-- fn:compare($comparand1  as xs:string?, $comparand2 as xs:string?)  as xs:integer?  -->

http://git-wip-us.apache.org/repos/asf/incubator-vxquery/blob/f72feda7/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionDataSource.java
----------------------------------------------------------------------
diff --git a/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionDataSource.java b/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionDataSource.java
index 3fabeda..1f9f0a7 100644
--- a/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionDataSource.java
+++ b/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionDataSource.java
@@ -16,6 +16,7 @@
  */
 package org.apache.vxquery.metadata;
 
+import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -31,8 +32,10 @@ import edu.uci.ics.hyracks.algebricks.core.algebra.properties.RandomPartitioning
 import edu.uci.ics.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
 
 public class VXQueryCollectionDataSource implements IDataSource<String> {
+    private static final String DELIMITER = "\\|";
     private final int dataSourceId;
     private final String collectionName;
+    private final String[] collectionPartitions;
     private final List<Integer> childSeq;
     private int totalDataSources;
 
@@ -43,6 +46,7 @@ public class VXQueryCollectionDataSource implements IDataSource<String> {
     public VXQueryCollectionDataSource(int id, String file, Object[] types) {
         this.dataSourceId = id;
         this.collectionName = file;
+        collectionPartitions = collectionName.split(DELIMITER);
         this.types = types;
         final IPhysicalPropertiesVector vec = new StructuralPropertiesVector(new RandomPartitioningProperty(
                 new CollectionFileDomain(collectionName)), new ArrayList<ILocalStructuralProperty>());
@@ -67,6 +71,14 @@ public class VXQueryCollectionDataSource implements IDataSource<String> {
         return dataSourceId;
     }
 
+    public String[] getPartitions() {
+        return collectionPartitions;
+    }
+    
+    public int getPartitionCount() {
+        return collectionPartitions.length;
+    }
+
     @Override
     public String getId() {
         return collectionName;

http://git-wip-us.apache.org/repos/asf/incubator-vxquery/blob/f72feda7/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionOperatorDescriptor.java b/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionOperatorDescriptor.java
index 7276b5a..593b8ad 100644
--- a/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionOperatorDescriptor.java
+++ b/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionOperatorDescriptor.java
@@ -18,14 +18,21 @@ package org.apache.vxquery.metadata;
 
 import java.io.File;
 import java.nio.ByteBuffer;
+import java.util.Iterator;
 import java.util.List;
 
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.filefilter.TrueFileFilter;
+import org.apache.vxquery.datamodel.accessors.PointablePool;
+import org.apache.vxquery.datamodel.accessors.PointablePoolFactory;
 import org.apache.vxquery.datamodel.accessors.TaggedValuePointable;
 import org.apache.vxquery.exceptions.SystemException;
-import org.apache.vxquery.runtime.functions.step.ChildPathStep;
+import org.apache.vxquery.runtime.functions.step.ChildPathStepOperatorDescriptor;
 import org.apache.vxquery.runtime.functions.util.FunctionHelper;
 import org.apache.vxquery.xmlparser.ITreeNodeIdProvider;
+import org.apache.vxquery.xmlparser.SAXContentHandler;
 import org.apache.vxquery.xmlparser.TreeNodeIdProvider;
+import org.apache.vxquery.xmlparser.XMLParser;
 import org.xml.sax.InputSource;
 
 import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
@@ -36,7 +43,6 @@ import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry;
 import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
 import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
@@ -47,13 +53,13 @@ public class VXQueryCollectionOperatorDescriptor extends AbstractSingleActivityO
     private static final long serialVersionUID = 1L;
     private short dataSourceId;
     private short totalDataSources;
-    private String collectionName;
+    private String[] collectionPartitions;
     private List<Integer> childSeq;
 
     public VXQueryCollectionOperatorDescriptor(IOperatorDescriptorRegistry spec, VXQueryCollectionDataSource ds,
             RecordDescriptor rDesc) {
         super(spec, 1, 1);
-        collectionName = ds.getId();
+        collectionPartitions = ds.getPartitions();
         dataSourceId = (short) ds.getDataSourceId();
         totalDataSources = (short) ds.getTotalDataSources();
         childSeq = ds.getChildSeq();
@@ -65,17 +71,20 @@ public class VXQueryCollectionOperatorDescriptor extends AbstractSingleActivityO
             IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
         final FrameTupleAccessor fta = new FrameTupleAccessor(ctx.getFrameSize(),
                 recordDescProvider.getInputRecordDescriptor(getActivityId(), 0));
-        final ArrayTupleBuilder tb = new ArrayTupleBuilder(recordDescProvider.getOutputRecordDescriptor(
-                getActivityId(), 0).getFieldCount());
+        final int fieldOutputCount = recordDescProvider.getOutputRecordDescriptor(getActivityId(), 0).getFieldCount();
         final ByteBuffer frame = ctx.allocateFrame();
-        final FrameTupleAppender appender = new FrameTupleAppender(ctx.getFrameSize());
+        final FrameTupleAppender appender = new FrameTupleAppender(ctx.getFrameSize(), fieldOutputCount);
         final InputSource in = new InputSource();
         final ArrayBackedValueStorage abvsFileNode = new ArrayBackedValueStorage();
         final short partitionId = (short) ctx.getTaskAttemptId().getTaskId().getPartition();
         final ITreeNodeIdProvider nodeIdProvider = new TreeNodeIdProvider(partitionId, dataSourceId, totalDataSources);
         final String nodeId = ctx.getJobletContext().getApplicationContext().getNodeId();
         final int frameSize = ctx.getFrameSize();
-        final IHyracksTaskContext ctx1 = ctx;
+        final PointablePool ppool = PointablePoolFactory.INSTANCE.createPointablePool();
+        final ChildPathStepOperatorDescriptor childPathStep = new ChildPathStepOperatorDescriptor(ctx, ppool);
+
+        final String collectionName = collectionPartitions[partition % collectionPartitions.length];
+        final XMLParser parser = new XMLParser(false, nodeIdProvider);;
 
         return new AbstractUnaryInputUnaryOutputOperatorNodePushable() {
             @Override
@@ -93,7 +102,12 @@ public class VXQueryCollectionOperatorDescriptor extends AbstractSingleActivityO
                 // Go through each tuple.
                 if (collectionDirectory.isDirectory()) {
                     for (int t = 0; t < fta.getTupleCount(); ++t) {
-                        addXmlFile(collectionDirectory, t);
+                        @SuppressWarnings("unchecked")
+                        Iterator<File> it = FileUtils.iterateFiles(collectionDirectory, new VXQueryIOFileFilter(),
+                                TrueFileFilter.INSTANCE);
+                        while (it.hasNext()) {
+                            addNextXmlNode(it.next(), t);
+                        }
                     }
                 } else {
                     throw new HyracksDataException(
@@ -101,79 +115,76 @@ public class VXQueryCollectionOperatorDescriptor extends AbstractSingleActivityO
                 }
             }
 
-            private void addXmlFile(File collectionDirectory, int t) throws HyracksDataException {
-                // Add a field for the document node to each tuple.
-                for (File file : collectionDirectory.listFiles()) {
-                    // Add the document node to the frame output.
-                    if (FunctionHelper.readableXmlFile(file.getPath())) {
-                        // TODO Make field addition based on output tuples instead of files.
-
-                        // Now add new field.
-                        abvsFileNode.reset();
-                        try {
-                            FunctionHelper.readInDocFromString(file.getPath(), in, abvsFileNode, nodeIdProvider);
-                        } catch (Exception e) {
-                            throw new HyracksDataException(e);
-                        }
+            /**
+             * Add the document node to the frame output.
+             */
+            private void addNextXmlNode(File file, int t) throws HyracksDataException {
+                // Now add new field.
+                abvsFileNode.reset();
+                try {
+                    FunctionHelper.readInDocFromString(file, in, abvsFileNode, parser);
+                } catch (Exception e) {
+                    throw new HyracksDataException(e);
+                }
 
-                        if (childSeq.isEmpty()) {
-                            // Can not fit XML file into frame.
-                            if (frameSize <= (abvsFileNode.getLength() - abvsFileNode.getStartOffset())) {
-                                throw new HyracksDataException(
-                                        "XML node is to large for the current frame size (VXQueryCollectionOperatorDescriptor.addXmlFile).");
-                            }
-                            // Return the whole XML file.
-                            tb.addField(abvsFileNode.getByteArray(), abvsFileNode.getStartOffset(),
-                                    abvsFileNode.getLength());
-                        } else {
-                            // Process child nodes.
-                            TaggedValuePointable tvp = (TaggedValuePointable) TaggedValuePointable.FACTORY
-                                    .createPointable();
-                            tvp.set(abvsFileNode.getByteArray(), abvsFileNode.getStartOffset(),
-                                    abvsFileNode.getLength());
-                            processChildStep(tb, tvp, t);
-                        }
-                    } else if (file.isDirectory()) {
-                        // Consider all XML file in sub directories.
-                        addXmlFile(file, t);
+                TaggedValuePointable tvp = ppool.takeOne(TaggedValuePointable.class);
+                if (childSeq.isEmpty()) {
+                    // Can not fit XML file into frame.
+                    if (frameSize <= (abvsFileNode.getLength() - abvsFileNode.getStartOffset())) {
+                        throw new HyracksDataException(
+                                "XML node is to large for the current frame size (VXQueryCollectionOperatorDescriptor.addXmlFile).");
                     }
+                    tvp.set(abvsFileNode.getByteArray(), abvsFileNode.getStartOffset(), abvsFileNode.getLength());
+                    addNodeToTuple(tvp, t);
+                } else {
+                    // Process child nodes.
+                    tvp.set(abvsFileNode.getByteArray(), abvsFileNode.getStartOffset(), abvsFileNode.getLength());
+                    processChildStep(tvp, t);
                 }
+                ppool.giveBack(tvp);
             }
 
-            private void processChildStep(final ArrayTupleBuilder tb, TaggedValuePointable tvp, int t)
-                    throws HyracksDataException {
-                TaggedValuePointable result = (TaggedValuePointable) TaggedValuePointable.FACTORY.createPointable();
-                ChildPathStep childPathStep = new ChildPathStep(ctx1);
+            private void processChildStep(TaggedValuePointable tvp, int t) throws HyracksDataException {
                 try {
                     childPathStep.init(tvp, childSeq);
                 } catch (SystemException e) {
                     throw new HyracksDataException("Child path step failed to load node tree.");
                 }
                 try {
+                    TaggedValuePointable result = ppool.takeOne(TaggedValuePointable.class);
                     while (childPathStep.step(result)) {
-                        // First copy all new fields over.
-                        tb.reset();
-                        if (fta.getFieldCount() > 0) {
-                            for (int f = 0; f < fta.getFieldCount(); ++f) {
-                                tb.addField(fta, t, f);
-                            }
-                        }
-                        tb.addField(result.getByteArray(), result.getStartOffset(), result.getLength());
-                        // Send to the writer.
-                        if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
-                            FrameUtils.flushFrame(frame, writer);
-                            appender.reset(frame, true);
-                            if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
-                                throw new HyracksDataException(
-                                        "Could not write frame (VXQueryCollectionOperatorDescriptor.createPushRuntime).");
-                            }
-                        }
+                        addNodeToTuple(result, t);
                     }
+                    ppool.giveBack(result);
                 } catch (AlgebricksException e) {
                     throw new HyracksDataException(e);
                 }
             }
 
+            private void addNodeToTuple(TaggedValuePointable result, int t) throws HyracksDataException {
+                // Send to the writer.
+                if (!addNodeToTupleAppender(result, t)) {
+                    FrameUtils.flushFrame(frame, writer);
+                    appender.reset(frame, true);
+                    if (!addNodeToTupleAppender(result, t)) {
+                        throw new HyracksDataException(
+                                "Could not write frame (VXQueryCollectionOperatorDescriptor.createPushRuntime).");
+                    }
+                }
+            }
+
+            private boolean addNodeToTupleAppender(TaggedValuePointable result, int t) throws HyracksDataException {
+                // First copy all new fields over.
+                if (fta.getFieldCount() > 0) {
+                    for (int f = 0; f < fta.getFieldCount(); ++f) {
+                        if (!appender.appendField(fta, t, f)) {
+                            return false;
+                        }
+                    }
+                }
+                return appender.appendField(result.getByteArray(), result.getStartOffset(), result.getLength());
+            }
+
             @Override
             public void fail() throws HyracksDataException {
                 writer.fail();

http://git-wip-us.apache.org/repos/asf/incubator-vxquery/blob/f72feda7/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryIOFileFilter.java
----------------------------------------------------------------------
diff --git a/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryIOFileFilter.java b/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryIOFileFilter.java
new file mode 100644
index 0000000..104f254
--- /dev/null
+++ b/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryIOFileFilter.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.vxquery.metadata;
+
+import java.io.File;
+
+import org.apache.commons.io.filefilter.IOFileFilter;
+
+public class VXQueryIOFileFilter implements IOFileFilter {
+
+    @Override
+    public boolean accept(final File file) {
+        return accept(file, file.getName());
+    }
+
+    @Override
+    public boolean accept(final File file, final String name) {
+        if (name.toLowerCase().endsWith(".xml") || name.toLowerCase().endsWith(".xml.gz")) {
+            return true;
+        }
+        return false;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-vxquery/blob/f72feda7/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryMetadataProvider.java
----------------------------------------------------------------------
diff --git a/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryMetadataProvider.java b/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryMetadataProvider.java
index d324d80..cd323d9 100644
--- a/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryMetadataProvider.java
+++ b/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryMetadataProvider.java
@@ -17,6 +17,7 @@
 package org.apache.vxquery.metadata;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.List;
 
 import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
@@ -38,7 +39,6 @@ import edu.uci.ics.hyracks.algebricks.data.IAWriterFactory;
 import edu.uci.ics.hyracks.algebricks.data.IPrinterFactory;
 import edu.uci.ics.hyracks.algebricks.data.IResultSerializerFactoryProvider;
 import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
-import edu.uci.ics.hyracks.algebricks.runtime.operators.std.SinkWriterRuntimeFactory;
 import edu.uci.ics.hyracks.algebricks.runtime.serializer.ResultSerializerFactoryProvider;
 import edu.uci.ics.hyracks.algebricks.runtime.writers.PrinterBasedWriterFactory;
 import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
@@ -47,7 +47,6 @@ import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.dataset.ResultSetId;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
 import edu.uci.ics.hyracks.dataflow.std.result.ResultWriterOperatorDescriptor;
 
 public class VXQueryMetadataProvider implements IMetadataProvider<String, String> {
@@ -67,14 +66,26 @@ public class VXQueryMetadataProvider implements IMetadataProvider<String, String
             List<LogicalVariable> scanVariables, List<LogicalVariable> projectVariables, boolean projectPushed,
             IOperatorSchema opSchema, IVariableTypeEnvironment typeEnv, JobGenContext context,
             JobSpecification jobSpec, Object implConfig) throws AlgebricksException {
+        VXQueryCollectionDataSource ds = (VXQueryCollectionDataSource) dataSource;
         RecordDescriptor rDesc = new RecordDescriptor(new ISerializerDeserializer[opSchema.getSize()]);
-        IOperatorDescriptor scanner = new VXQueryCollectionOperatorDescriptor(jobSpec,
-                (VXQueryCollectionDataSource) dataSource, rDesc);
+        IOperatorDescriptor scanner = new VXQueryCollectionOperatorDescriptor(jobSpec, ds, rDesc);
 
-        AlgebricksAbsolutePartitionConstraint constraint = new AlgebricksAbsolutePartitionConstraint(nodeList);
+        AlgebricksPartitionConstraint constraint = getClusterLocations(nodeList, ds.getPartitionCount());
         return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(scanner, constraint);
     }
 
+    public AlgebricksPartitionConstraint getClusterLocations(String[] nodeList, int partitions) {
+        ArrayList<String> locs = new ArrayList<String>();
+        for (String node : nodeList) {
+            for (int j = 0; j < partitions; j++) {
+                locs.add(node);
+            }
+        }
+        String[] cluster = new String[locs.size()];
+        cluster = locs.toArray(cluster);
+        return new AlgebricksAbsolutePartitionConstraint(cluster);
+    }
+
     @Override
     public boolean scannerOperatorIsLeaf(IDataSource<String> dataSource) {
         return false;
@@ -84,16 +95,7 @@ public class VXQueryMetadataProvider implements IMetadataProvider<String, String
     public Pair<IPushRuntimeFactory, AlgebricksPartitionConstraint> getWriteFileRuntime(IDataSink sink,
             int[] printColumns, IPrinterFactory[] printerFactories, RecordDescriptor inputDesc)
             throws AlgebricksException {
-        QueryResultDataSink ds = (QueryResultDataSink) sink;
-        FileSplit[] fileSplits = ds.getFileSplits();
-        String[] locations = new String[fileSplits.length];
-        for (int i = 0; i < fileSplits.length; ++i) {
-            locations[i] = fileSplits[i].getNodeName();
-        }
-        IPushRuntimeFactory prf = new SinkWriterRuntimeFactory(printColumns, printerFactories, fileSplits[0]
-                .getLocalFile().getFile(), PrinterBasedWriterFactory.INSTANCE, inputDesc);
-        AlgebricksAbsolutePartitionConstraint constraint = new AlgebricksAbsolutePartitionConstraint(locations);
-        return new Pair<IPushRuntimeFactory, AlgebricksPartitionConstraint>(prf, constraint);
+        return null;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-vxquery/blob/f72feda7/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/comparison/general/AbstractGeneralComparisonScalarEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/comparison/general/AbstractGeneralComparisonScalarEvaluatorFactory.java b/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/comparison/general/AbstractGeneralComparisonScalarEvaluatorFactory.java
index 70f2e4e..1bcd1ce 100644
--- a/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/comparison/general/AbstractGeneralComparisonScalarEvaluatorFactory.java
+++ b/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/comparison/general/AbstractGeneralComparisonScalarEvaluatorFactory.java
@@ -213,7 +213,7 @@ public abstract class AbstractGeneralComparisonScalarEvaluatorFactory extends
                         ah.atomize(tvpArg1, ppool, tvpTransform1);
                         tid1 = FunctionHelper.getBaseTypeForGeneralComparisons(tvpTransform1.getTag());
                     } else if (tid2 == ValueTag.NODE_TREE_TAG) {
-                        FunctionHelper.atomize(tvpArg2, tvpTransform2);
+                        ah.atomize(tvpArg2, ppool, tvpTransform2);
                         tid2 = FunctionHelper.getBaseTypeForGeneralComparisons(tvpTransform2.getTag());
                     }
 

http://git-wip-us.apache.org/repos/asf/incubator-vxquery/blob/f72feda7/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/node/FnCollectionScalarEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/node/FnCollectionScalarEvaluatorFactory.java b/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/node/FnCollectionScalarEvaluatorFactory.java
deleted file mode 100644
index d843d78..0000000
--- a/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/node/FnCollectionScalarEvaluatorFactory.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.vxquery.runtime.functions.node;
-
-import java.io.DataInputStream;
-import java.io.File;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-
-import org.apache.vxquery.datamodel.accessors.TaggedValuePointable;
-import org.apache.vxquery.datamodel.builders.sequence.SequenceBuilder;
-import org.apache.vxquery.datamodel.values.ValueTag;
-import org.apache.vxquery.exceptions.ErrorCode;
-import org.apache.vxquery.exceptions.SystemException;
-import org.apache.vxquery.runtime.functions.base.AbstractTaggedValueArgumentScalarEvaluator;
-import org.apache.vxquery.runtime.functions.base.AbstractTaggedValueArgumentScalarEvaluatorFactory;
-import org.apache.vxquery.runtime.functions.util.FunctionHelper;
-import org.apache.vxquery.xmlparser.ITreeNodeIdProvider;
-import org.apache.vxquery.xmlparser.TreeNodeIdProvider;
-import org.xml.sax.InputSource;
-
-import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
-import edu.uci.ics.hyracks.algebricks.runtime.base.IScalarEvaluator;
-import edu.uci.ics.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.data.std.api.IPointable;
-import edu.uci.ics.hyracks.data.std.primitive.UTF8StringPointable;
-import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
-import edu.uci.ics.hyracks.dataflow.common.comm.util.ByteBufferInputStream;
-
-public class FnCollectionScalarEvaluatorFactory extends AbstractTaggedValueArgumentScalarEvaluatorFactory {
-    private static final long serialVersionUID = 1L;
-
-    public FnCollectionScalarEvaluatorFactory(IScalarEvaluatorFactory[] args) {
-        super(args);
-    }
-
-    @Override
-    protected IScalarEvaluator createEvaluator(IHyracksTaskContext ctx, IScalarEvaluator[] args)
-            throws AlgebricksException {
-        final ArrayBackedValueStorage abvs = new ArrayBackedValueStorage();
-        final UTF8StringPointable stringp = (UTF8StringPointable) UTF8StringPointable.FACTORY.createPointable();
-        final TaggedValuePointable nodep = (TaggedValuePointable) TaggedValuePointable.FACTORY.createPointable();
-        final ByteBufferInputStream bbis = new ByteBufferInputStream();
-        final DataInputStream di = new DataInputStream(bbis);
-        final SequenceBuilder sb = new SequenceBuilder();
-        final ArrayBackedValueStorage abvsFileNode = new ArrayBackedValueStorage();
-        final InputSource in = new InputSource();
-        final int partition = ctx.getTaskAttemptId().getTaskId().getPartition();
-        final ITreeNodeIdProvider nodeIdProvider = new TreeNodeIdProvider((short) partition);
-
-        return new AbstractTaggedValueArgumentScalarEvaluator(args) {
-            @Override
-            protected void evaluate(TaggedValuePointable[] args, IPointable result) throws SystemException {
-                String collectionName;
-                TaggedValuePointable tvp = args[0];
-                // TODO add support empty sequence and no argument.
-                if (tvp.getTag() != ValueTag.XS_STRING_TAG) {
-                    throw new SystemException(ErrorCode.FORG0006);
-                }
-                tvp.getValue(stringp);
-                try {
-                    // Get the list of files.
-                    bbis.setByteBuffer(ByteBuffer.wrap(Arrays.copyOfRange(stringp.getByteArray(),
-                            stringp.getStartOffset(), stringp.getLength() + stringp.getStartOffset())), 0);
-                    collectionName = di.readUTF();
-                } catch (IOException e) {
-                    throw new SystemException(ErrorCode.SYSE0001, e);
-                }
-                File collectionDirectory = new File(collectionName);
-                if (!collectionDirectory.exists()) {
-                    throw new RuntimeException("The collection directory (" + collectionName + ") does not exist.");
-                }
-
-                try {
-                    abvs.reset();
-                    sb.reset(abvs);
-                    addXmlFiles(collectionDirectory);
-                    sb.finish();
-                    result.set(abvs);
-                } catch (IOException e) {
-                    throw new SystemException(ErrorCode.SYSE0001, e);
-                }
-            }
-
-            private void addXmlFiles(File collectionDirectory) throws SystemException, IOException {
-                for (File file : collectionDirectory.listFiles()) {
-                    // Add the document node to the sequence.
-                    if (FunctionHelper.readableXmlFile(file.getPath())) {
-                        abvsFileNode.reset();
-                        FunctionHelper.readInDocFromString(file.getPath(), in, abvsFileNode, nodeIdProvider);
-                        nodep.set(abvsFileNode.getByteArray(), abvsFileNode.getStartOffset(), abvsFileNode.getLength());
-                        sb.addItem(nodep);
-                    } else if (file.isDirectory()) {
-                        // Consider all XML file in sub directories.
-                        addXmlFiles(file);
-                    }
-                }
-            }
-        };
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-vxquery/blob/f72feda7/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/node/FnDocScalarEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/node/FnDocScalarEvaluatorFactory.java b/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/node/FnDocScalarEvaluatorFactory.java
index c634962..8bb8af9 100644
--- a/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/node/FnDocScalarEvaluatorFactory.java
+++ b/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/node/FnDocScalarEvaluatorFactory.java
@@ -29,12 +29,14 @@ import org.apache.vxquery.runtime.functions.base.AbstractTaggedValueArgumentScal
 import org.apache.vxquery.runtime.functions.util.FunctionHelper;
 import org.apache.vxquery.xmlparser.ITreeNodeIdProvider;
 import org.apache.vxquery.xmlparser.TreeNodeIdProvider;
+import org.apache.vxquery.xmlparser.XMLParser;
 import org.xml.sax.InputSource;
 
 import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
 import edu.uci.ics.hyracks.algebricks.runtime.base.IScalarEvaluator;
 import edu.uci.ics.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.data.std.api.IPointable;
 import edu.uci.ics.hyracks.data.std.primitive.UTF8StringPointable;
 import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
@@ -76,7 +78,13 @@ public class FnDocScalarEvaluatorFactory extends AbstractTaggedValueArgumentScal
                     throw new SystemException(ErrorCode.FORG0006);
                 }
                 tvp.getValue(stringp);
-                FunctionHelper.readInDocFromPointable(stringp, in, bbis, di, abvs, nodeIdProvider);
+                try {
+                    // Only one document should be parsed so its ok to have a unique parser.
+                    XMLParser parser = new XMLParser(false, nodeIdProvider);
+                    FunctionHelper.readInDocFromPointable(stringp, in, bbis, di, abvs, parser);
+                } catch (Exception e) {
+                    throw new SystemException(ErrorCode.SYSE0001, e);
+                }
                 result.set(abvs);
             }
         };

http://git-wip-us.apache.org/repos/asf/incubator-vxquery/blob/f72feda7/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/step/AbstractChildPathStep.java
----------------------------------------------------------------------
diff --git a/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/step/AbstractChildPathStep.java b/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/step/AbstractChildPathStep.java
new file mode 100644
index 0000000..2668719
--- /dev/null
+++ b/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/step/AbstractChildPathStep.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.vxquery.runtime.functions.step;
+
+import java.io.IOException;
+
+import org.apache.vxquery.context.DynamicContext;
+import org.apache.vxquery.datamodel.accessors.PointablePool;
+import org.apache.vxquery.datamodel.accessors.SequencePointable;
+import org.apache.vxquery.datamodel.accessors.TaggedValuePointable;
+import org.apache.vxquery.datamodel.accessors.nodes.DocumentNodePointable;
+import org.apache.vxquery.datamodel.accessors.nodes.ElementNodePointable;
+import org.apache.vxquery.datamodel.accessors.nodes.NodeTreePointable;
+import org.apache.vxquery.datamodel.builders.nodes.NodeSubTreeBuilder;
+import org.apache.vxquery.datamodel.values.ValueTag;
+import org.apache.vxquery.datamodel.values.XDMConstants;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.data.std.api.IPointable;
+import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
+
+public abstract class AbstractChildPathStep {
+    protected final DynamicContext dCtx;
+    protected final PointablePool pp;
+    private final ArrayBackedValueStorage nodeAbvs = new ArrayBackedValueStorage();
+    protected final NodeTreePointable ntp = (NodeTreePointable) NodeTreePointable.FACTORY.createPointable();
+    private final DocumentNodePointable dnp = (DocumentNodePointable) DocumentNodePointable.FACTORY.createPointable();
+    private final ElementNodePointable enp = (ElementNodePointable) ElementNodePointable.FACTORY.createPointable();
+    private final NodeSubTreeBuilder nstb = new NodeSubTreeBuilder();
+
+    public AbstractChildPathStep(IHyracksTaskContext ctx, PointablePool pp) {
+        dCtx = (DynamicContext) ctx.getJobletContext().getGlobalJobData();
+        this.pp = pp;
+    }
+
+    protected void getSequence(TaggedValuePointable tvp, SequencePointable seqp) {
+        switch (tvp.getTag()) {
+            case ValueTag.DOCUMENT_NODE_TAG:
+                tvp.getValue(dnp);
+                dnp.getContent(ntp, seqp);
+                return;
+
+            case ValueTag.ELEMENT_NODE_TAG:
+                tvp.getValue(enp);
+                if (enp.childrenChunkExists()) {
+                    enp.getChildrenSequence(ntp, seqp);
+                    return;
+                }
+        }
+        XDMConstants.setEmptySequence(seqp);
+    }
+
+    protected void setNodeToResult(TaggedValuePointable tvpItem, IPointable result) throws IOException {
+        nodeAbvs.reset();
+        nstb.reset(nodeAbvs);
+        nstb.setChildNode(ntp, tvpItem);
+        nstb.finish();
+        result.set(nodeAbvs.getByteArray(), nodeAbvs.getStartOffset(), nodeAbvs.getLength());
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-vxquery/blob/f72feda7/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/step/ChildPathStep.java
----------------------------------------------------------------------
diff --git a/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/step/ChildPathStep.java b/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/step/ChildPathStep.java
deleted file mode 100644
index da70ca8..0000000
--- a/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/step/ChildPathStep.java
+++ /dev/null
@@ -1,200 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.vxquery.runtime.functions.step;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.ArrayList;
-
-import org.apache.vxquery.context.DynamicContext;
-import org.apache.vxquery.datamodel.accessors.SequencePointable;
-import org.apache.vxquery.datamodel.accessors.TaggedValuePointable;
-import org.apache.vxquery.datamodel.accessors.nodes.DocumentNodePointable;
-import org.apache.vxquery.datamodel.accessors.nodes.ElementNodePointable;
-import org.apache.vxquery.datamodel.accessors.nodes.NodeTreePointable;
-import org.apache.vxquery.datamodel.builders.nodes.NodeSubTreeBuilder;
-import org.apache.vxquery.datamodel.values.ValueTag;
-import org.apache.vxquery.datamodel.values.XDMConstants;
-import org.apache.vxquery.exceptions.ErrorCode;
-import org.apache.vxquery.exceptions.SystemException;
-import org.apache.vxquery.runtime.functions.step.NodeTestFilter.INodeFilter;
-import org.apache.vxquery.types.SequenceType;
-
-import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.data.std.api.IPointable;
-import edu.uci.ics.hyracks.data.std.primitive.IntegerPointable;
-import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
-
-public class ChildPathStep {
-    private final DynamicContext dCtx;
-    private List<INodeFilter> filter;
-    private int indexSeqArgs;
-    private int[] indexSequence;
-    private ArrayBackedValueStorage nodeAbvs;
-    private final NodeTreePointable ntp = (NodeTreePointable) NodeTreePointable.FACTORY.createPointable();
-    private int seqArgsLength;
-    private final SequencePointable seqNtp = (SequencePointable) SequencePointable.FACTORY.createPointable();
-    private final TaggedValuePointable tvpItem = (TaggedValuePointable) TaggedValuePointable.FACTORY.createPointable();
-
-    public ChildPathStep(IHyracksTaskContext ctx) {
-        dCtx = (DynamicContext) ctx.getJobletContext().getGlobalJobData();
-        nodeAbvs = new ArrayBackedValueStorage();
-        filter = new ArrayList<INodeFilter>();
-    }
-
-    private void getSequence(TaggedValuePointable tvp, SequencePointable seqp) {
-        switch (tvp.getTag()) {
-            case ValueTag.DOCUMENT_NODE_TAG:
-                DocumentNodePointable dnp = (DocumentNodePointable) DocumentNodePointable.FACTORY.createPointable();
-                tvp.getValue(dnp);
-                dnp.getContent(ntp, seqp);
-                return;
-
-            case ValueTag.ELEMENT_NODE_TAG:
-                ElementNodePointable enp = (ElementNodePointable) ElementNodePointable.FACTORY.createPointable();
-                tvp.getValue(enp);
-                if (enp.childrenChunkExists()) {
-                    enp.getChildrenSequence(ntp, seqp);
-                    return;
-                }
-        }
-        TaggedValuePointable seqTvp = (TaggedValuePointable) TaggedValuePointable.FACTORY.createPointable();
-        XDMConstants.setEmptySequence(seqTvp);
-        seqTvp.getValue(seqp);
-    }
-
-    public void init(TaggedValuePointable tvp, List<Integer> typeCodes) throws SystemException {
-        indexSequence = new int[typeCodes.size()];
-        for (int i = 0; i < typeCodes.size(); ++i) {
-            indexSequence[i] = 0;
-        }
-        indexSeqArgs = 0;
-
-        setFilterCode(typeCodes);
-
-        if (tvp.getTag() != ValueTag.NODE_TREE_TAG) {
-            throw new SystemException(ErrorCode.SYSE0001);
-        }
-        tvp.getValue(ntp);
-        seqArgsLength = -1;
-    }
-
-    protected void init(TaggedValuePointable[] args) throws SystemException {
-        indexSequence = new int[1];
-        indexSequence[0] = 0;
-        indexSeqArgs = 0;
-
-        if (args[1].getTag() != ValueTag.XS_INT_TAG) {
-            throw new IllegalArgumentException("Expected int value tag, got: " + args[1].getTag());
-        }
-        IntegerPointable ip = (IntegerPointable) IntegerPointable.FACTORY.createPointable();
-        args[1].getValue(ip);
-        int typeCode = ip.getInteger();
-        setFilterCode(typeCode);
-
-        if (args[0].getTag() == ValueTag.SEQUENCE_TAG) {
-            args[0].getValue(seqNtp);
-            seqArgsLength = seqNtp.getEntryCount();
-        } else if (args[0].getTag() == ValueTag.NODE_TREE_TAG) {
-            args[0].getValue(ntp);
-            seqArgsLength = -1;
-        } else {
-            throw new SystemException(ErrorCode.SYSE0001);
-        }
-    }
-
-    public void setFilterCode(int typeCode) {
-        SequenceType sType = dCtx.getStaticContext().lookupSequenceType(typeCode);
-        filter.add(NodeTestFilter.getNodeTestFilter(sType));
-    }
-
-    private void setFilterCode(List<Integer> typeCodes) {
-        for (int typeCode : typeCodes) {
-            SequenceType sType = dCtx.getStaticContext().lookupSequenceType(typeCode);
-            INodeFilter f = NodeTestFilter.getNodeTestFilter(sType);
-            filter.add(f);
-        }
-    }
-
-    private void setNodeToResult(IPointable result) throws IOException {
-        nodeAbvs.reset();
-        NodeSubTreeBuilder nstb = new NodeSubTreeBuilder();
-        nstb.reset(nodeAbvs);
-        nstb.setChildNode(ntp, tvpItem);
-        nstb.finish();
-        result.set(nodeAbvs.getByteArray(), nodeAbvs.getStartOffset(), nodeAbvs.getLength());
-    }
-
-    public boolean step(IPointable result) throws AlgebricksException {
-        TaggedValuePointable tvpRoot = (TaggedValuePointable) TaggedValuePointable.FACTORY.createPointable();
-        if (seqArgsLength > 0) {
-            while (indexSeqArgs < seqArgsLength) {
-                TaggedValuePointable tvpNtp = (TaggedValuePointable) TaggedValuePointable.FACTORY.createPointable();
-                seqNtp.getEntry(indexSeqArgs, tvpNtp);
-                if (tvpNtp.getTag() != ValueTag.NODE_TREE_TAG) {
-                    String description = ErrorCode.SYSE0001 + ": " + ErrorCode.SYSE0001.getDescription();
-                    throw new AlgebricksException(description);
-                }
-                tvpNtp.getValue(ntp);
-                ntp.getRootNode(tvpRoot);
-                if (stepNodeTree(tvpRoot, 0, result)) {
-                    return true;
-                }
-            }
-        } else {
-            // Single node tree input.
-            ntp.getRootNode(tvpRoot);
-            if (stepNodeTree(tvpRoot, 0, result)) {
-                return true;
-            }
-        }
-        return false;
-    }
-
-    private boolean stepNodeTree(TaggedValuePointable tvpInput, int level, IPointable result) throws AlgebricksException {
-        SequencePointable seqItem = (SequencePointable) SequencePointable.FACTORY.createPointable();
-        getSequence(tvpInput, seqItem);
-        int seqLength = seqItem.getEntryCount();
-        while (indexSequence[level] < seqLength) {
-            // Get the next item
-            seqItem.getEntry(indexSequence[level], tvpItem);
-
-            // Test to see if the item fits the path step
-            if (filter.get(level).accept(ntp, tvpItem)) {
-                if (level + 1 < indexSequence.length) {
-                    if (stepNodeTree(tvpItem, level + 1, result)) {
-                        return true;
-                    }
-                } else {
-                    try {
-                        setNodeToResult(result);
-                        ++indexSequence[level];
-                        return true;
-                    } catch (IOException e) {
-                        String description = ErrorCode.SYSE0001 + ": " + ErrorCode.SYSE0001.getDescription();
-                        throw new AlgebricksException(description);
-                    }
-                }
-            }
-            ++indexSequence[level];
-        }
-        // Reset for next node tree.
-        indexSequence[level] = 0;
-        return false;
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-vxquery/blob/f72feda7/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/step/ChildPathStepOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/step/ChildPathStepOperatorDescriptor.java b/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/step/ChildPathStepOperatorDescriptor.java
new file mode 100644
index 0000000..1edf0f5
--- /dev/null
+++ b/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/step/ChildPathStepOperatorDescriptor.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.vxquery.runtime.functions.step;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.vxquery.datamodel.accessors.PointablePool;
+import org.apache.vxquery.datamodel.accessors.SequencePointable;
+import org.apache.vxquery.datamodel.accessors.TaggedValuePointable;
+import org.apache.vxquery.datamodel.accessors.nodes.DocumentNodePointable;
+import org.apache.vxquery.datamodel.accessors.nodes.ElementNodePointable;
+import org.apache.vxquery.datamodel.accessors.nodes.NodeTreePointable;
+import org.apache.vxquery.datamodel.builders.nodes.NodeSubTreeBuilder;
+import org.apache.vxquery.datamodel.values.ValueTag;
+import org.apache.vxquery.datamodel.values.XDMConstants;
+import org.apache.vxquery.exceptions.ErrorCode;
+import org.apache.vxquery.exceptions.SystemException;
+import org.apache.vxquery.runtime.functions.step.NodeTestFilter.INodeFilter;
+import org.apache.vxquery.types.SequenceType;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.data.std.api.IPointable;
+import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
+
+public class ChildPathStepOperatorDescriptor extends AbstractChildPathStep {
+    private List<INodeFilter> filter = new ArrayList<INodeFilter>();
+    private int[] indexSequence;
+    private final ArrayBackedValueStorage nodeAbvs = new ArrayBackedValueStorage();
+    protected final NodeTreePointable ntp = (NodeTreePointable) NodeTreePointable.FACTORY.createPointable();
+    private final TaggedValuePointable tvpItem = (TaggedValuePointable) TaggedValuePointable.FACTORY.createPointable();
+    private final DocumentNodePointable dnp = (DocumentNodePointable) DocumentNodePointable.FACTORY.createPointable();
+    private final ElementNodePointable enp = (ElementNodePointable) ElementNodePointable.FACTORY.createPointable();
+    private final TaggedValuePointable tvpStep = (TaggedValuePointable) TaggedValuePointable.FACTORY.createPointable();
+    private final NodeSubTreeBuilder nstb = new NodeSubTreeBuilder();
+
+    public ChildPathStepOperatorDescriptor(IHyracksTaskContext ctx, PointablePool pp) {
+        super(ctx, pp);
+    }
+
+    protected void getSequence(TaggedValuePointable tvp, SequencePointable seqp) {
+        switch (tvp.getTag()) {
+            case ValueTag.DOCUMENT_NODE_TAG:
+                tvp.getValue(dnp);
+                dnp.getContent(ntp, seqp);
+                return;
+
+            case ValueTag.ELEMENT_NODE_TAG:
+                tvp.getValue(enp);
+                if (enp.childrenChunkExists()) {
+                    enp.getChildrenSequence(ntp, seqp);
+                    return;
+                }
+        }
+        XDMConstants.setEmptySequence(seqp);
+    }
+
+    public void init(TaggedValuePointable tvp, List<Integer> typeCodes) throws SystemException {
+        indexSequence = new int[typeCodes.size()];
+        for (int i = 0; i < typeCodes.size(); ++i) {
+            indexSequence[i] = 0;
+        }
+        setFilterCode(typeCodes);
+        if (tvp.getTag() != ValueTag.NODE_TREE_TAG) {
+            throw new SystemException(ErrorCode.SYSE0001);
+        }
+        tvp.getValue(ntp);
+    }
+
+    protected void setFilterCode(List<Integer> typeCodes) {
+        for (int typeCode : typeCodes) {
+            SequenceType sType = dCtx.getStaticContext().lookupSequenceType(typeCode);
+            INodeFilter f = NodeTestFilter.getNodeTestFilter(sType);
+            filter.add(f);
+        }
+    }
+
+    protected void setNodeToResult(TaggedValuePointable tvpItem, IPointable result) throws IOException {
+        nodeAbvs.reset();
+        nstb.reset(nodeAbvs);
+        nstb.setChildNode(ntp, tvpItem);
+        nstb.finish();
+        result.set(nodeAbvs.getByteArray(), nodeAbvs.getStartOffset(), nodeAbvs.getLength());
+    }
+
+    /**
+     * Single node tree input.
+     * Requirement: "ntp" must be the node tree.
+     */
+    public boolean step(IPointable result) throws AlgebricksException {
+        ntp.getRootNode(tvpStep);
+        if (stepNodeTree(tvpStep, 0, result)) {
+            return true;
+        }
+        return false;
+    }
+
+    /**
+     * Find the next node to return.
+     */
+    protected boolean stepNodeTree(TaggedValuePointable tvpInput, int level, IPointable result)
+            throws AlgebricksException {
+        SequencePointable seqItem = pp.takeOne(SequencePointable.class);
+        try {
+            getSequence(tvpInput, seqItem);
+            int seqLength = seqItem.getEntryCount();
+            while (indexSequence[level] < seqLength) {
+                // Get the next item
+                seqItem.getEntry(indexSequence[level], tvpItem);
+
+                // Test to see if the item fits the path step
+                if (filter.get(level).accept(ntp, tvpItem)) {
+                    if (level + 1 < indexSequence.length) {
+                        if (stepNodeTree(tvpItem, level + 1, result)) {
+                            return true;
+                        }
+                    } else {
+                        try {
+                            setNodeToResult(tvpItem, result);
+                            ++indexSequence[level];
+                            return true;
+                        } catch (IOException e) {
+                            String description = ErrorCode.SYSE0001 + ": " + ErrorCode.SYSE0001.getDescription();
+                            throw new AlgebricksException(description);
+                        }
+                    }
+                }
+                ++indexSequence[level];
+            }
+            // Reset for next node tree.
+            indexSequence[level] = 0;
+            return false;
+        } finally {
+            pp.giveBack(seqItem);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-vxquery/blob/f72feda7/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/step/ChildPathStepScalarEvaluator.java
----------------------------------------------------------------------
diff --git a/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/step/ChildPathStepScalarEvaluator.java b/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/step/ChildPathStepScalarEvaluator.java
index d831c95..729f8a2 100644
--- a/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/step/ChildPathStepScalarEvaluator.java
+++ b/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/step/ChildPathStepScalarEvaluator.java
@@ -31,20 +31,17 @@ import edu.uci.ics.hyracks.data.std.api.IPointable;
 import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
 
 public class ChildPathStepScalarEvaluator extends AbstractTaggedValueArgumentScalarEvaluator {
-    private final SequenceBuilder seqb;
+    private final SequenceBuilder seqb = new SequenceBuilder();
 
-    private final ArrayBackedValueStorage seqAbvs;
+    private final ArrayBackedValueStorage seqAbvs = new ArrayBackedValueStorage();
 
-    private final TaggedValuePointable itemTvp;
+    private final ChildPathStepUnnesting childPathStep;
 
-    private final ChildPathStep childPathStep;
+    private final TaggedValuePointable itemTvp = (TaggedValuePointable) TaggedValuePointable.FACTORY.createPointable();
 
     public ChildPathStepScalarEvaluator(IScalarEvaluator[] args, IHyracksTaskContext ctx) {
         super(args);
-        seqb = new SequenceBuilder();
-        seqAbvs = new ArrayBackedValueStorage();
-        itemTvp = (TaggedValuePointable) TaggedValuePointable.FACTORY.createPointable();
-        childPathStep = new ChildPathStep(ctx);
+        childPathStep = new ChildPathStepUnnesting(ctx, ppool);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-vxquery/blob/f72feda7/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/step/ChildPathStepUnnesting.java
----------------------------------------------------------------------
diff --git a/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/step/ChildPathStepUnnesting.java b/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/step/ChildPathStepUnnesting.java
new file mode 100644
index 0000000..1e93ce3
--- /dev/null
+++ b/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/step/ChildPathStepUnnesting.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.vxquery.runtime.functions.step;
+
+import java.io.IOException;
+
+import org.apache.vxquery.datamodel.accessors.PointablePool;
+import org.apache.vxquery.datamodel.accessors.SequencePointable;
+import org.apache.vxquery.datamodel.accessors.TaggedValuePointable;
+import org.apache.vxquery.datamodel.values.ValueTag;
+import org.apache.vxquery.exceptions.ErrorCode;
+import org.apache.vxquery.exceptions.SystemException;
+import org.apache.vxquery.runtime.functions.step.NodeTestFilter.INodeFilter;
+import org.apache.vxquery.types.SequenceType;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.data.std.api.IPointable;
+import edu.uci.ics.hyracks.data.std.primitive.IntegerPointable;
+
+public class ChildPathStepUnnesting extends AbstractChildPathStep {
+    private int indexSeqArgs;
+    private int seqArgsLength;
+    private int indexSequence;
+    private final IntegerPointable ip = (IntegerPointable) IntegerPointable.FACTORY.createPointable();
+    private final SequencePointable seqItem = (SequencePointable) SequencePointable.FACTORY.createPointable();
+    private final SequencePointable seqNtp = (SequencePointable) SequencePointable.FACTORY.createPointable();
+    private final TaggedValuePointable tvpItem = (TaggedValuePointable) TaggedValuePointable.FACTORY.createPointable();
+    private final TaggedValuePointable tvpNtp = (TaggedValuePointable) TaggedValuePointable.FACTORY.createPointable();
+    private final TaggedValuePointable tvpStep = (TaggedValuePointable) TaggedValuePointable.FACTORY.createPointable();
+    INodeFilter filter;
+
+    public ChildPathStepUnnesting(IHyracksTaskContext ctx, PointablePool pp) {
+        super(ctx, pp);
+    }
+
+    protected void init(TaggedValuePointable[] args) throws SystemException {
+        indexSeqArgs = 0;
+        indexSequence = 0;
+
+        if (args[1].getTag() != ValueTag.XS_INT_TAG) {
+            throw new IllegalArgumentException("Expected int value tag, got: " + args[1].getTag());
+        }
+        args[1].getValue(ip);
+        SequenceType sType = dCtx.getStaticContext().lookupSequenceType(ip.getInteger());
+        filter = NodeTestFilter.getNodeTestFilter(sType);
+
+        if (args[0].getTag() == ValueTag.SEQUENCE_TAG) {
+            args[0].getValue(seqNtp);
+            seqArgsLength = seqNtp.getEntryCount();
+        } else if (args[0].getTag() == ValueTag.NODE_TREE_TAG) {
+            args[0].getValue(ntp);
+            seqArgsLength = -1;
+        } else {
+            throw new SystemException(ErrorCode.SYSE0001);
+        }
+    }
+
+    public boolean step(IPointable result) throws AlgebricksException {
+        if (seqArgsLength > 0) {
+            while (indexSeqArgs < seqArgsLength) {
+                seqNtp.getEntry(indexSeqArgs, tvpNtp);
+                if (tvpNtp.getTag() != ValueTag.NODE_TREE_TAG) {
+                    String description = ErrorCode.SYSE0001 + ": " + ErrorCode.SYSE0001.getDescription();
+                    throw new AlgebricksException(description);
+                }
+                tvpNtp.getValue(ntp);
+                ntp.getRootNode(tvpStep);
+                if (stepNodeTree(tvpStep, 0, result)) {
+                    return true;
+                }
+            }
+        } else {
+            // Single node tree input.
+            ntp.getRootNode(tvpStep);
+            if (stepNodeTree(tvpStep, 0, result)) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    /**
+     * Find the next node to return.
+     */
+    protected boolean stepNodeTree(TaggedValuePointable tvpInput, int level, IPointable result)
+            throws AlgebricksException {
+        getSequence(tvpInput, seqItem);
+        int seqLength = seqItem.getEntryCount();
+        while (indexSequence < seqLength) {
+            // Get the next item
+            seqItem.getEntry(indexSequence, tvpItem);
+
+            // Test to see if the item fits the path step
+            if (filter.accept(ntp, tvpItem)) {
+                try {
+                    setNodeToResult(tvpItem, result);
+                    ++indexSequence;
+                    return true;
+                } catch (IOException e) {
+                    String description = ErrorCode.SYSE0001 + ": " + ErrorCode.SYSE0001.getDescription();
+                    throw new AlgebricksException(description);
+                }
+            }
+            ++indexSequence;
+        }
+        // Reset for next node tree.
+        indexSequence = 0;
+        return false;
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-vxquery/blob/f72feda7/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/step/ChildPathStepUnnestingEvaluator.java
----------------------------------------------------------------------
diff --git a/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/step/ChildPathStepUnnestingEvaluator.java b/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/step/ChildPathStepUnnestingEvaluator.java
new file mode 100644
index 0000000..02515c4
--- /dev/null
+++ b/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/step/ChildPathStepUnnestingEvaluator.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.vxquery.runtime.functions.step;
+
+import org.apache.vxquery.datamodel.accessors.TaggedValuePointable;
+import org.apache.vxquery.exceptions.SystemException;
+import org.apache.vxquery.runtime.functions.base.AbstractTaggedValueArgumentUnnestingEvaluator;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IScalarEvaluator;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.data.std.api.IPointable;
+
+public class ChildPathStepUnnestingEvaluator extends AbstractTaggedValueArgumentUnnestingEvaluator {
+    final ChildPathStepUnnesting childPathStep;
+
+    public ChildPathStepUnnestingEvaluator(IHyracksTaskContext ctx, IScalarEvaluator[] args) {
+        super(args);
+        childPathStep = new ChildPathStepUnnesting(ctx, ppool);
+    }
+
+    public boolean step(IPointable result) throws AlgebricksException {
+        return childPathStep.step(result);
+    }
+
+    @Override
+    protected void init(TaggedValuePointable[] args) throws SystemException {
+        childPathStep.init(args);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-vxquery/blob/f72feda7/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/step/ChildPathStepUnnestingEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/step/ChildPathStepUnnestingEvaluatorFactory.java b/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/step/ChildPathStepUnnestingEvaluatorFactory.java
index 78b6ae9..fc9844a 100644
--- a/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/step/ChildPathStepUnnestingEvaluatorFactory.java
+++ b/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/step/ChildPathStepUnnestingEvaluatorFactory.java
@@ -16,9 +16,6 @@
  */
 package org.apache.vxquery.runtime.functions.step;
 
-import org.apache.vxquery.datamodel.accessors.TaggedValuePointable;
-import org.apache.vxquery.exceptions.SystemException;
-import org.apache.vxquery.runtime.functions.base.AbstractTaggedValueArgumentUnnestingEvaluator;
 import org.apache.vxquery.runtime.functions.base.AbstractTaggedValueArgumentUnnestingEvaluatorFactory;
 
 import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
@@ -26,7 +23,6 @@ import edu.uci.ics.hyracks.algebricks.runtime.base.IScalarEvaluator;
 import edu.uci.ics.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
 import edu.uci.ics.hyracks.algebricks.runtime.base.IUnnestingEvaluator;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.data.std.api.IPointable;
 
 public class ChildPathStepUnnestingEvaluatorFactory extends AbstractTaggedValueArgumentUnnestingEvaluatorFactory {
     private static final long serialVersionUID = 1L;
@@ -38,18 +34,6 @@ public class ChildPathStepUnnestingEvaluatorFactory extends AbstractTaggedValueA
     @Override
     protected IUnnestingEvaluator createEvaluator(IHyracksTaskContext ctx, IScalarEvaluator[] args)
             throws AlgebricksException {
-        final ChildPathStep childPathStep = new ChildPathStep(ctx);
-        
-        return new AbstractTaggedValueArgumentUnnestingEvaluator(args) {
-            @Override
-            public boolean step(IPointable result) throws AlgebricksException {
-                return childPathStep.step(result);
-            }
-
-            @Override
-            protected void init(TaggedValuePointable[] args) throws SystemException {
-                childPathStep.init(args);
-            }
-        };
+        return new ChildPathStepUnnestingEvaluator(ctx, args);
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-vxquery/blob/f72feda7/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/util/FunctionHelper.java
----------------------------------------------------------------------
diff --git a/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/util/FunctionHelper.java b/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/util/FunctionHelper.java
index 9f1dba8..dbd45cb 100644
--- a/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/util/FunctionHelper.java
+++ b/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/util/FunctionHelper.java
@@ -20,6 +20,7 @@ import java.io.DataInputStream;
 import java.io.DataOutput;
 import java.io.File;
 import java.io.FileInputStream;
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InputStreamReader;
 import java.nio.ByteBuffer;
@@ -513,157 +514,6 @@ public class FunctionHelper {
         throw new SystemException(ErrorCode.XPTY0004);
     }
 
-    public static void atomize(TaggedValuePointable tvp, IPointable result) throws IOException {
-        NodeTreePointable ntp = (NodeTreePointable) NodeTreePointable.FACTORY.createPointable();
-        switch (tvp.getTag()) {
-            case ValueTag.NODE_TREE_TAG:
-                tvp.getValue(ntp);
-                atomizeNode(ntp, result);
-                break;
-
-            default:
-                result.set(tvp);
-        }
-    }
-
-    public static void atomizeNode(NodeTreePointable ntp, IPointable result) throws IOException {
-        ArrayBackedValueStorage tempABVS = new ArrayBackedValueStorage();
-        TaggedValuePointable tempTVP = (TaggedValuePointable) TaggedValuePointable.FACTORY.createPointable();
-        VoidPointable vp = (VoidPointable) VoidPointable.FACTORY.createPointable();
-        PointablePool pp = PointablePoolFactory.INSTANCE.createPointablePool();
-        ntp.getRootNode(tempTVP);
-        switch (tempTVP.getTag()) {
-            case ValueTag.ATTRIBUTE_NODE_TAG: {
-                AttributeNodePointable anp = pp.takeOne(AttributeNodePointable.class);
-                try {
-                    tempTVP.getValue(anp);
-                    anp.getValue(ntp, result);
-                } finally {
-                    pp.giveBack(anp);
-                }
-                break;
-            }
-
-            case ValueTag.TEXT_NODE_TAG:
-            case ValueTag.COMMENT_NODE_TAG: {
-                TextOrCommentNodePointable tcnp = pp.takeOne(TextOrCommentNodePointable.class);
-                try {
-                    tempTVP.getValue(tcnp);
-                    tcnp.getValue(ntp, vp);
-                    tempABVS.reset();
-                    tempABVS.getDataOutput().write(ValueTag.XS_UNTYPED_ATOMIC_TAG);
-                    tempABVS.append(vp);
-                    result.set(tempABVS.getByteArray(), tempABVS.getStartOffset(), tempABVS.getLength());
-                } finally {
-                    pp.giveBack(tcnp);
-                }
-                break;
-            }
-
-            case ValueTag.DOCUMENT_NODE_TAG: {
-                DocumentNodePointable dnp = pp.takeOne(DocumentNodePointable.class);
-                SequencePointable sp = pp.takeOne(SequencePointable.class);
-                try {
-                    tempTVP.getValue(dnp);
-                    dnp.getContent(ntp, sp);
-                    buildStringConcatenation(sp, tempABVS, ntp);
-                    result.set(tempABVS.getByteArray(), tempABVS.getStartOffset(), tempABVS.getLength());
-                } finally {
-                    pp.giveBack(sp);
-                    pp.giveBack(dnp);
-                }
-                break;
-            }
-
-            case ValueTag.ELEMENT_NODE_TAG: {
-                ElementNodePointable enp = pp.takeOne(ElementNodePointable.class);
-                SequencePointable sp = pp.takeOne(SequencePointable.class);
-                try {
-                    tempTVP.getValue(enp);
-                    if (enp.childrenChunkExists()) {
-                        enp.getChildrenSequence(ntp, sp);
-                        buildStringConcatenation(sp, tempABVS, ntp);
-                        result.set(tempABVS.getByteArray(), tempABVS.getStartOffset(), tempABVS.getLength());
-                    }
-                } finally {
-                    pp.giveBack(sp);
-                    pp.giveBack(enp);
-                }
-                break;
-            }
-
-            case ValueTag.PI_NODE_TAG: {
-                PINodePointable pnp = pp.takeOne(PINodePointable.class);
-                try {
-                    tempTVP.getValue(pnp);
-                    pnp.getContent(ntp, vp);
-                    tempABVS.reset();
-                    tempABVS.getDataOutput().write(ValueTag.XS_UNTYPED_ATOMIC_TAG);
-                    tempABVS.append(vp);
-                    result.set(tempABVS.getByteArray(), tempABVS.getStartOffset(), tempABVS.getLength());
-                } finally {
-                    pp.giveBack(pnp);
-                }
-                break;
-            }
-
-        }
-    }
-
-    public static void buildStringConcatenation(SequencePointable sp, ArrayBackedValueStorage tempABVS,
-            NodeTreePointable ntp) throws IOException {
-        tempABVS.reset();
-        DataOutput out = tempABVS.getDataOutput();
-        out.write(ValueTag.XS_UNTYPED_ATOMIC_TAG);
-        // Leave room for the utf-8 length
-        out.write(0);
-        out.write(0);
-        buildConcatenationRec(sp, out, ntp);
-        int utflen = tempABVS.getLength() - 3;
-        byte[] bytes = tempABVS.getByteArray();
-        // Patch utf-8 length at bytes 1 and 2
-        bytes[1] = (byte) ((utflen >>> 8) & 0xFF);
-        bytes[2] = (byte) ((utflen >>> 0) & 0xFF);
-    }
-
-    public static void buildConcatenationRec(SequencePointable sp, DataOutput out, NodeTreePointable ntp)
-            throws IOException {
-        TaggedValuePointable tempTVP2 = (TaggedValuePointable) TaggedValuePointable.FACTORY.createPointable();
-        VoidPointable vp = (VoidPointable) VoidPointable.FACTORY.createPointable();
-        PointablePool pp = PointablePoolFactory.INSTANCE.createPointablePool();
-        int nItems = sp.getEntryCount();
-        for (int i = 0; i < nItems; ++i) {
-            sp.getEntry(i, tempTVP2);
-            switch (tempTVP2.getTag()) {
-                case ValueTag.TEXT_NODE_TAG: {
-                    TextOrCommentNodePointable tcnp = pp.takeOne(TextOrCommentNodePointable.class);
-                    try {
-                        tempTVP2.getValue(tcnp);
-                        tcnp.getValue(ntp, vp);
-                        out.write(vp.getByteArray(), vp.getStartOffset() + 2, vp.getLength() - 2);
-                    } finally {
-                        pp.giveBack(tcnp);
-                    }
-                    break;
-                }
-                case ValueTag.ELEMENT_NODE_TAG: {
-                    ElementNodePointable enp = pp.takeOne(ElementNodePointable.class);
-                    SequencePointable sp2 = pp.takeOne(SequencePointable.class);
-                    try {
-                        tempTVP2.getValue(enp);
-                        if (enp.childrenChunkExists()) {
-                            enp.getChildrenSequence(ntp, sp2);
-                            buildConcatenationRec(sp2, out, ntp);
-                        }
-                    } finally {
-                        pp.giveBack(sp2);
-                        pp.giveBack(enp);
-                    }
-                }
-            }
-        }
-    }
-
     public static boolean compareTaggedValues(AbstractValueComparisonOperation aOp, TaggedValuePointable tvp1,
             TaggedValuePointable tvp2, DynamicContext dCtx) throws SystemException {
         final TypedPointables tp1 = new TypedPointables();
@@ -1389,39 +1239,24 @@ public class FunctionHelper {
         System.err.println(" printUTF8String END");
     }
 
-    /**
-     * Checks the path has an acceptable file name extension to read in.
-     * 
-     * @param path
-     * @return
-     */
-    public static boolean readableXmlFile(String path) {
-        if (path.toLowerCase().endsWith(".xml") || path.toLowerCase().endsWith(".xml.gz")) {
-            return true;
-        }
-        return false;
-    }
-
     public static void readInDocFromPointable(UTF8StringPointable stringp, InputSource in, ByteBufferInputStream bbis,
-            DataInputStream di, ArrayBackedValueStorage abvs, ITreeNodeIdProvider treeNodeIdProvider)
-            throws SystemException {
+            DataInputStream di, ArrayBackedValueStorage abvs, XMLParser parser) throws SystemException {
         String fName = getStringFromPointable(stringp, bbis, di);
-        readInDocFromString(fName, in, abvs, treeNodeIdProvider);
+        File file = new File(fName);
+        readInDocFromString(file, in, abvs, parser);
     }
 
-    public static void readInDocFromString(String fName, InputSource in, ArrayBackedValueStorage abvs,
-            ITreeNodeIdProvider treeNodeIdProvider) throws SystemException {
-        File file = new File(fName);
-        if (!file.exists()) {
-            throw new RuntimeException("The file (" + fName + ") does not exist.");
-        }
+    public static void readInDocFromString(File file, InputSource in, ArrayBackedValueStorage abvs, XMLParser parser)
+            throws SystemException {
         try {
-            if (fName.toLowerCase().endsWith(".xml.gz")) {
-                in.setCharacterStream(new InputStreamReader(new GZIPInputStream(new FileInputStream(fName))));
+            if (file.getName().toLowerCase().endsWith(".xml.gz")) {
+                in.setCharacterStream(new InputStreamReader(new GZIPInputStream(new FileInputStream(file))));
             } else {
-                in.setCharacterStream(new InputStreamReader(new FileInputStream(fName)));
+                in.setCharacterStream(new InputStreamReader(new FileInputStream(file)));
             }
-            XMLParser.parseInputSource(in, abvs, false, treeNodeIdProvider);
+            parser.parseInputSource(in, abvs);
+        } catch (FileNotFoundException e) {
+            throw new RuntimeException("The file (" + file.getName() + ") does not exist.");
         } catch (IOException e) {
             throw new SystemException(ErrorCode.SYSE0001, e);
         }

http://git-wip-us.apache.org/repos/asf/incubator-vxquery/blob/f72feda7/vxquery-core/src/main/java/org/apache/vxquery/xmlparser/SAXContentHandler.java
----------------------------------------------------------------------
diff --git a/vxquery-core/src/main/java/org/apache/vxquery/xmlparser/SAXContentHandler.java b/vxquery-core/src/main/java/org/apache/vxquery/xmlparser/SAXContentHandler.java
index ecf4d7d..a8ec0b9 100644
--- a/vxquery-core/src/main/java/org/apache/vxquery/xmlparser/SAXContentHandler.java
+++ b/vxquery-core/src/main/java/org/apache/vxquery/xmlparser/SAXContentHandler.java
@@ -162,6 +162,8 @@ public class SAXContentHandler implements ContentHandler, LexicalHandler {
     @Override
     public void startDocument() throws SAXException {
         try {
+            db.reset();
+            docABVS.reset();
             docb.reset(docABVS);
             if (createNodeIds) {
                 docb.setLocalNodeId(nodeIdCounter++);

http://git-wip-us.apache.org/repos/asf/incubator-vxquery/blob/f72feda7/vxquery-core/src/main/java/org/apache/vxquery/xmlparser/XMLParser.java
----------------------------------------------------------------------
diff --git a/vxquery-core/src/main/java/org/apache/vxquery/xmlparser/XMLParser.java b/vxquery-core/src/main/java/org/apache/vxquery/xmlparser/XMLParser.java
index fee41df..c2ada24 100644
--- a/vxquery-core/src/main/java/org/apache/vxquery/xmlparser/XMLParser.java
+++ b/vxquery-core/src/main/java/org/apache/vxquery/xmlparser/XMLParser.java
@@ -20,9 +20,36 @@ import org.xml.sax.InputSource;
 import org.xml.sax.XMLReader;
 import org.xml.sax.helpers.XMLReaderFactory;
 
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
 
 public class XMLParser {
+    XMLReader parser;
+    SAXContentHandler handler;
+
+    public XMLParser(boolean attachTypes, ITreeNodeIdProvider idProvider) throws HyracksDataException {
+        try {
+            parser = XMLReaderFactory.createXMLReader();
+            handler = new SAXContentHandler(attachTypes, idProvider);
+            parser.setContentHandler(handler);
+            parser.setProperty("http://xml.org/sax/properties/lexical-handler", handler);
+        } catch (Exception e) {
+            throw new HyracksDataException(e.toString());
+        }
+    }
+
+    public void parseInputSource(InputSource in, ArrayBackedValueStorage abvs) throws HyracksDataException {
+        try {
+            parser.parse(in);
+            handler.write(abvs);
+        } catch (Exception e) {
+            throw new HyracksDataException(e.toString());
+        }
+    }
+
+    public void reset() throws SystemException {
+    }
+
     public static void parseInputSource(InputSource in, ArrayBackedValueStorage abvs, boolean attachTypes,
             ITreeNodeIdProvider idProvider) throws SystemException {
         XMLReader parser;