You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@vxquery.apache.org by pr...@apache.org on 2013/09/09 19:32:18 UTC

svn commit: r1521209 - in /incubator/vxquery/trunk/vxquery: vxquery-cli/src/main/java/org/apache/vxquery/cli/ vxquery-core/src/main/java/org/apache/vxquery/compiler/ vxquery-core/src/main/java/org/apache/vxquery/metadata/ vxquery-core/src/main/java/org...

Author: prestonc
Date: Mon Sep  9 17:32:17 2013
New Revision: 1521209

URL: http://svn.apache.org/r1521209
Log:
Updated VXQuery to use the DistributedResult operator. The operator allows for multiple node controllers to be used during execution. The CLI will now work with a cluster of node controllers. In addition to cluster support, the CLI has been modified to run multiple node controllers on a local system for testing purposes. A command line option -local-node-controllers can specify the number of nodes and ${nodeId} can be added to the collection function to distinguish between different data partitions.

Added:
    incubator/vxquery/trunk/vxquery/vxquery-core/src/main/java/org/apache/vxquery/metadata/QueryResultSetDataSink.java   (with props)
Modified:
    incubator/vxquery/trunk/vxquery/vxquery-cli/src/main/java/org/apache/vxquery/cli/VXQuery.java
    incubator/vxquery/trunk/vxquery/vxquery-core/src/main/java/org/apache/vxquery/compiler/CompilerControlBlock.java
    incubator/vxquery/trunk/vxquery/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionOperatorDescriptor.java
    incubator/vxquery/trunk/vxquery/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryMetadataProvider.java
    incubator/vxquery/trunk/vxquery/vxquery-core/src/main/java/org/apache/vxquery/xmlquery/query/XMLQueryCompiler.java
    incubator/vxquery/trunk/vxquery/vxquery-core/src/main/java/org/apache/vxquery/xmlquery/translator/XMLQueryTranslator.java
    incubator/vxquery/trunk/vxquery/vxquery-parent/pom.xml

Modified: incubator/vxquery/trunk/vxquery/vxquery-cli/src/main/java/org/apache/vxquery/cli/VXQuery.java
URL: http://svn.apache.org/viewvc/incubator/vxquery/trunk/vxquery/vxquery-cli/src/main/java/org/apache/vxquery/cli/VXQuery.java?rev=1521209&r1=1521208&r2=1521209&view=diff
==============================================================================
--- incubator/vxquery/trunk/vxquery/vxquery-cli/src/main/java/org/apache/vxquery/cli/VXQuery.java (original)
+++ incubator/vxquery/trunk/vxquery/vxquery-cli/src/main/java/org/apache/vxquery/cli/VXQuery.java Mon Sep  9 17:32:17 2013
@@ -16,7 +16,9 @@ package org.apache.vxquery.cli;
 
 import java.io.File;
 import java.io.IOException;
+import java.io.PrintWriter;
 import java.io.StringReader;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.EnumSet;
 import java.util.HashMap;
@@ -47,22 +49,32 @@ import edu.uci.ics.hyracks.algebricks.co
 import edu.uci.ics.hyracks.algebricks.core.algebra.prettyprint.PlanPrettyPrinter;
 import edu.uci.ics.hyracks.api.client.HyracksConnection;
 import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
+import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.dataset.IHyracksDataset;
+import edu.uci.ics.hyracks.api.dataset.IHyracksDatasetReader;
+import edu.uci.ics.hyracks.api.dataset.ResultSetId;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.job.JobFlag;
 import edu.uci.ics.hyracks.api.job.JobId;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.client.dataset.HyracksDataset;
 import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
 import edu.uci.ics.hyracks.control.common.controllers.CCConfig;
 import edu.uci.ics.hyracks.control.common.controllers.NCConfig;
 import edu.uci.ics.hyracks.control.nc.NodeControllerService;
-import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ResultFrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.ByteBufferInputStream;
 
 public class VXQuery {
     private final CmdLineOptions opts;
 
     private ClusterControllerService cc;
-    private NodeControllerService nc1;
-    private NodeControllerService nc2;
+    private NodeControllerService[] ncs;
     private IHyracksClientConnection hcc;
+    private IHyracksDataset hds;
+
+    private ResultSetId resultSetId;
 
     public VXQuery(CmdLineOptions opts) {
         this.opts = opts;
@@ -157,10 +169,19 @@ public class VXQuery {
                     }
                 }
             };
-            FileSplit[] fileSplits = getFileSplits(opts.outfileSplits);
-            XMLQueryCompiler compiler = new XMLQueryCompiler(listener);
-            CompilerControlBlock ccb = new CompilerControlBlock(new StaticContextImpl(
-                    RootStaticContextImpl.INSTANCE), fileSplits);
+
+            // Get cluster node configuration.
+            Map<String, NodeControllerInfo> nodeControllerInfos = hcc.getNodeControllerInfos();
+            String[] nodeList = new String[nodeControllerInfos.size()];
+            int index = 0;
+            for (String node : nodeControllerInfos.keySet()) {
+                nodeList[index++] = node;
+            }
+
+            XMLQueryCompiler compiler = new XMLQueryCompiler(listener, nodeList);
+            resultSetId = createResultSetId();
+            CompilerControlBlock ccb = new CompilerControlBlock(new StaticContextImpl(RootStaticContextImpl.INSTANCE),
+                    resultSetId);
             compiler.compile(query, new StringReader(qStr), ccb, opts.optimizationLevel);
             if (opts.compileOnly) {
                 continue;
@@ -172,58 +193,39 @@ public class VXQuery {
             DynamicContext dCtx = new DynamicContextImpl(module.getModuleContext());
             js.setGlobalJobDataFactory(new VXQueryGlobalDataFactory(dCtx.createFactory()));
 
+            PrintWriter writer = new PrintWriter(System.out, true);
             for (int i = 0; i < opts.repeatExec; ++i) {
-                runJob(js, fileSplits);
+                runJob(js, writer);
             }
         }
     }
-    
-    private FileSplit[] getFileSplits(String arg) throws IOException {
-        if (arg == null) {
-            File result = createTempFile("test");
-            return new FileSplit[] {
-                new FileSplit("nc1", result.getAbsolutePath())
-            };
-        } else {
-            String[] fileIds = arg.split(",");
-            FileSplit[] splits = new FileSplit[fileIds.length];
-            for (int i = 0; i < fileIds.length; ++i) {
-                String[] components = fileIds[i].split(":");
-                System.err.println(components);
-                splits[i] = new FileSplit(components[0], components[1]);
-            }
-            return splits;
+
+    private void runJob(JobSpecification spec, PrintWriter writer) throws Exception {
+        if (hds == null) {
+            hds = new HyracksDataset(hcc, spec.getFrameSize(), 1);
         }
-    }
 
-    private void runJob(JobSpecification spec, FileSplit[] fileSplits) throws Exception {
         JobId jobId = hcc.startJob(spec, EnumSet.of(JobFlag.PROFILE_RUNTIME));
-        hcc.waitForCompletion(jobId);
-        if (opts.outfileSplits == null) {
-            File result = fileSplits[0].getLocalFile().getFile();
-            dumpOutputFiles(result);
-        } else {
-            System.err.println("Results in:");
-            for (FileSplit fs : fileSplits) {
-                System.out.println(fs.getNodeName() + ":" + fs.getLocalFile().toString());
-            }
-        }
-    }
 
-    private void dumpOutputFiles(File f) throws IOException {
-        if (f.exists() && f.isFile()) {
-            System.err.println("Reading file: " + f.getAbsolutePath());
-            String data = FileUtils.readFileToString(f);
-            System.err.println(data);
+        ByteBuffer buffer = ByteBuffer.allocate(spec.getFrameSize());
+        IHyracksDatasetReader reader = hds.createReader(jobId, resultSetId);
+        IFrameTupleAccessor frameTupleAccessor = new ResultFrameTupleAccessor(spec.getFrameSize());
+        buffer.clear();
+
+        while (reader.read(buffer) > 0) {
+            buffer.clear();
+            writer.print(ResultUtils.getStringFromBuffer(buffer, frameTupleAccessor));
+            writer.flush();
         }
+
+        hcc.waitForCompletion(jobId);
     }
 
-    protected File createTempFile(String name) throws IOException {
-        System.err.println("Name: " + name);
-        File tempFile = File.createTempFile(name, ".tmp");
-        System.err.println("Output file: " + tempFile.getAbsolutePath());
-        tempFile.deleteOnExit();
-        return tempFile;
+    /**
+     * Create a unique result set id to get the correct query back from the cluster.
+     */
+    protected ResultSetId createResultSetId() {
+        return new ResultSetId(System.nanoTime());
     }
 
     public void startLocalHyracks() throws Exception {
@@ -242,32 +244,26 @@ public class VXQuery {
         cc = new ClusterControllerService(ccConfig);
         cc.start();
 
-        NCConfig ncConfig1 = new NCConfig();
-        ncConfig1.ccHost = "localhost";
-        ncConfig1.ccPort = 39001;
-        ncConfig1.clusterNetIPAddress = "127.0.0.1";
-        ncConfig1.dataIPAddress = "127.0.0.1";
-        ncConfig1.datasetIPAddress = "127.0.0.1";
-        ncConfig1.nodeId = "nc1";
-        nc1 = new NodeControllerService(ncConfig1);
-        nc1.start();
-
-        NCConfig ncConfig2 = new NCConfig();
-        ncConfig2.ccHost = "localhost";
-        ncConfig2.ccPort = 39001;
-        ncConfig2.clusterNetIPAddress = "127.0.0.1";
-        ncConfig2.dataIPAddress = "127.0.0.1";
-        ncConfig2.datasetIPAddress = "127.0.0.1";
-        ncConfig2.nodeId = "nc2";
-        nc2 = new NodeControllerService(ncConfig2);
-        nc2.start();
+        ncs = new NodeControllerService[opts.localNodeControllers];
+        for (int i = 0; i < ncs.length; i++) {
+            NCConfig ncConfig = new NCConfig();
+            ncConfig.ccHost = "localhost";
+            ncConfig.ccPort = 39001;
+            ncConfig.clusterNetIPAddress = "127.0.0.1";
+            ncConfig.dataIPAddress = "127.0.0.1";
+            ncConfig.datasetIPAddress = "127.0.0.1";
+            ncConfig.nodeId = "nc" + (i + 1);
+            ncs[i] = new NodeControllerService(ncConfig);
+            ncs[i].start();
+        }
 
         hcc = new HyracksConnection(ccConfig.clientNetIpAddress, ccConfig.clientNetPort);
     }
 
     public void stopLocalHyracks() throws Exception {
-        nc2.stop();
-        nc1.stop();
+        for (int i = 0; i < ncs.length; i++) {
+            ncs[i].stop();
+        }
         cc.stop();
     }
 
@@ -282,8 +278,8 @@ public class VXQuery {
         @Option(name = "-client-net-port", usage = "Port of the ClusterController (default 1098)")
         public int clientNetPort = 1098;
 
-        @Option(name = "-outfile-splits", usage = "Output file splits (e.g. \"nc1:/tmp/foo,nc2:/tmp/bar\"")
-        public String outfileSplits;
+        @Option(name = "-local-node-controllers", usage = "Number of local node controllers (default 1)")
+        public int localNodeControllers = 1;
 
         @Option(name = "-O", usage = "Optimization Level. Default: Full Optimization")
         private int optimizationLevel = Integer.MAX_VALUE;
@@ -318,4 +314,31 @@ public class VXQuery {
         @Argument
         private List<String> arguments = new ArrayList<String>();
     }
+
+    public static class ResultUtils {
+        public static String getStringFromBuffer(ByteBuffer buffer, IFrameTupleAccessor fta)
+                throws HyracksDataException {
+            String resultRecords = "";
+            ByteBufferInputStream bbis = new ByteBufferInputStream();
+            try {
+                fta.reset(buffer);
+                for (int tIndex = 0; tIndex < fta.getTupleCount(); tIndex++) {
+                    int start = fta.getTupleStartOffset(tIndex);
+                    int length = fta.getTupleEndOffset(tIndex) - start;
+                    bbis.setByteBuffer(buffer, start);
+                    byte[] recordBytes = new byte[length];
+                    bbis.read(recordBytes, 0, length);
+                    resultRecords += new String(recordBytes, 0, length);
+                }
+            } finally {
+                try {
+                    bbis.close();
+                } catch (IOException e) {
+                    throw new HyracksDataException(e);
+                }
+            }
+            return resultRecords;
+        }
+    }
+
 }
\ No newline at end of file

Modified: incubator/vxquery/trunk/vxquery/vxquery-core/src/main/java/org/apache/vxquery/compiler/CompilerControlBlock.java
URL: http://svn.apache.org/viewvc/incubator/vxquery/trunk/vxquery/vxquery-core/src/main/java/org/apache/vxquery/compiler/CompilerControlBlock.java?rev=1521209&r1=1521208&r2=1521209&view=diff
==============================================================================
--- incubator/vxquery/trunk/vxquery/vxquery-core/src/main/java/org/apache/vxquery/compiler/CompilerControlBlock.java (original)
+++ incubator/vxquery/trunk/vxquery/vxquery-core/src/main/java/org/apache/vxquery/compiler/CompilerControlBlock.java Mon Sep  9 17:32:17 2013
@@ -18,23 +18,23 @@ package org.apache.vxquery.compiler;
 
 import org.apache.vxquery.context.StaticContext;
 
-import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
+import edu.uci.ics.hyracks.api.dataset.ResultSetId;
 
 public class CompilerControlBlock {
     private final StaticContext ctx;
+ 
+    private ResultSetId resultSetId;
 
-    private final FileSplit[] resultFileSplits;
-
-    public CompilerControlBlock(StaticContext ctx, FileSplit[] resultFileSplits) {
+    public CompilerControlBlock(StaticContext ctx, ResultSetId resultSetId) {
         this.ctx = ctx;
-        this.resultFileSplits = resultFileSplits;
+        this.resultSetId = resultSetId;
     }
 
     public StaticContext getStaticContext() {
         return ctx;
     }
 
-    public FileSplit[] getResultFileSplits() {
-        return resultFileSplits;
+    public ResultSetId getResultSetId() {
+        return resultSetId;
     }
 }
\ No newline at end of file

Added: incubator/vxquery/trunk/vxquery/vxquery-core/src/main/java/org/apache/vxquery/metadata/QueryResultSetDataSink.java
URL: http://svn.apache.org/viewvc/incubator/vxquery/trunk/vxquery/vxquery-core/src/main/java/org/apache/vxquery/metadata/QueryResultSetDataSink.java?rev=1521209&view=auto
==============================================================================
--- incubator/vxquery/trunk/vxquery/vxquery-core/src/main/java/org/apache/vxquery/metadata/QueryResultSetDataSink.java (added)
+++ incubator/vxquery/trunk/vxquery/vxquery-core/src/main/java/org/apache/vxquery/metadata/QueryResultSetDataSink.java Mon Sep  9 17:32:17 2013
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.vxquery.metadata;
+
+import edu.uci.ics.hyracks.algebricks.core.algebra.metadata.IDataSink;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPartitioningProperty;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.RandomPartitioningProperty;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.ResultSetDomain;
+import edu.uci.ics.hyracks.api.dataset.ResultSetId;
+
+public class QueryResultSetDataSink implements IDataSink {
+
+    private ResultSetId id;
+    private Object[] schemaTypes;
+
+    public QueryResultSetDataSink(ResultSetId id, Object[] schemaTypes) {
+        this.id = id;
+        this.schemaTypes = schemaTypes;
+    }
+
+    @Override
+    public ResultSetId getId() {
+        return id;
+    }
+
+    @Override
+    public Object[] getSchemaTypes() {
+        return schemaTypes;
+    }
+
+    @Override
+    public IPartitioningProperty getPartitioningProperty() {
+        return new RandomPartitioningProperty(new ResultSetDomain());
+    }
+}
\ No newline at end of file

Propchange: incubator/vxquery/trunk/vxquery/vxquery-core/src/main/java/org/apache/vxquery/metadata/QueryResultSetDataSink.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/vxquery/trunk/vxquery/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionOperatorDescriptor.java
URL: http://svn.apache.org/viewvc/incubator/vxquery/trunk/vxquery/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionOperatorDescriptor.java?rev=1521209&r1=1521208&r2=1521209&view=diff
==============================================================================
--- incubator/vxquery/trunk/vxquery/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionOperatorDescriptor.java (original)
+++ incubator/vxquery/trunk/vxquery/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionOperatorDescriptor.java Mon Sep  9 17:32:17 2013
@@ -66,6 +66,7 @@ public class VXQueryCollectionOperatorDe
         final ArrayBackedValueStorage abvsFileNode = new ArrayBackedValueStorage();
         final short partitionId = (short) ctx.getTaskAttemptId().getTaskId().getPartition();
         final ITreeNodeIdProvider nodeIdProvider = new TreeNodeIdProvider(partitionId, dataSourceId, totalDataSources);
+        final String nodeId = ctx.getJobletContext().getApplicationContext().getNodeId();
 
         return new AbstractUnaryInputUnaryOutputOperatorNodePushable() {
             @Override
@@ -77,7 +78,8 @@ public class VXQueryCollectionOperatorDe
             @Override
             public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
                 fta.reset(buffer);
-                File collectionDirectory = new File(collectionName);
+                String collectionModifiedName = collectionName.replace("${nodeId}", nodeId);
+                File collectionDirectory = new File(collectionModifiedName);
 
                 // Go through each tuple.
                 for (int t = 0; t < fta.getTupleCount(); ++t) {

Modified: incubator/vxquery/trunk/vxquery/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryMetadataProvider.java
URL: http://svn.apache.org/viewvc/incubator/vxquery/trunk/vxquery/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryMetadataProvider.java?rev=1521209&r1=1521208&r2=1521209&view=diff
==============================================================================
--- incubator/vxquery/trunk/vxquery/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryMetadataProvider.java (original)
+++ incubator/vxquery/trunk/vxquery/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryMetadataProvider.java Mon Sep  9 17:32:17 2013
@@ -16,6 +16,7 @@
  */
 package org.apache.vxquery.metadata;
 
+import java.io.IOException;
 import java.util.List;
 
 import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
@@ -33,17 +34,29 @@ import edu.uci.ics.hyracks.algebricks.co
 import edu.uci.ics.hyracks.algebricks.core.algebra.metadata.IMetadataProvider;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
 import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import edu.uci.ics.hyracks.algebricks.data.IAWriterFactory;
 import edu.uci.ics.hyracks.algebricks.data.IPrinterFactory;
+import edu.uci.ics.hyracks.algebricks.data.IResultSerializerFactoryProvider;
 import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
 import edu.uci.ics.hyracks.algebricks.runtime.operators.std.SinkWriterRuntimeFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.serializer.ResultSerializerFactoryProvider;
 import edu.uci.ics.hyracks.algebricks.runtime.writers.PrinterBasedWriterFactory;
 import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.value.IResultSerializerFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.dataset.ResultSetId;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
 import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
+import edu.uci.ics.hyracks.dataflow.std.result.ResultWriterOperatorDescriptor;
 
 public class VXQueryMetadataProvider implements IMetadataProvider<String, String> {
+    String[] nodeList;
+
+    public void setNodeList(String[] nodeList) {
+        this.nodeList = nodeList;
+    }
+
     @Override
     public IDataSource<String> findDataSource(String id) throws AlgebricksException {
         return null;
@@ -59,10 +72,7 @@ public class VXQueryMetadataProvider imp
         IOperatorDescriptor scanner = new VXQueryCollectionOperatorDescriptor(jobSpec, ds.getId(),
                 ds.getDataSourceId(), ds.getTotalDataSources(), rDesc);
 
-        // TODO review if locations needs to be updated for parallel processing.
-        String[] locations = new String[1];
-        locations[0] = "nc1";
-        AlgebricksAbsolutePartitionConstraint constraint = new AlgebricksAbsolutePartitionConstraint(locations);
+        AlgebricksAbsolutePartitionConstraint constraint = new AlgebricksAbsolutePartitionConstraint(nodeList);
         return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(scanner, constraint);
     }
 
@@ -143,7 +153,23 @@ public class VXQueryMetadataProvider imp
     public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getResultHandleRuntime(IDataSink sink,
             int[] printColumns, IPrinterFactory[] printerFactories, RecordDescriptor inputDesc, boolean ordered,
             JobSpecification spec) throws AlgebricksException {
-        // TODO Auto-generated method stub
-        return null;
+        QueryResultSetDataSink rsds = (QueryResultSetDataSink) sink;
+        ResultSetId rssId = (ResultSetId) rsds.getId();
+
+        IResultSerializerFactoryProvider resultSerializerFactoryProvider = ResultSerializerFactoryProvider.INSTANCE;
+        IAWriterFactory writerFactory = PrinterBasedWriterFactory.INSTANCE;
+
+        ResultWriterOperatorDescriptor resultWriter = null;
+        try {
+            IResultSerializerFactory resultSerializedAppenderFactory = resultSerializerFactoryProvider
+                    .getAqlResultSerializerFactoryProvider(printColumns, printerFactories, writerFactory);
+            resultWriter = new ResultWriterOperatorDescriptor(spec, rssId, ordered, false,
+                    resultSerializedAppenderFactory);
+        } catch (IOException e) {
+            throw new AlgebricksException(e);
+        }
+
+        return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(resultWriter, null);
     }
+
 }
\ No newline at end of file

Modified: incubator/vxquery/trunk/vxquery/vxquery-core/src/main/java/org/apache/vxquery/xmlquery/query/XMLQueryCompiler.java
URL: http://svn.apache.org/viewvc/incubator/vxquery/trunk/vxquery/vxquery-core/src/main/java/org/apache/vxquery/xmlquery/query/XMLQueryCompiler.java?rev=1521209&r1=1521208&r2=1521209&view=diff
==============================================================================
--- incubator/vxquery/trunk/vxquery/vxquery-core/src/main/java/org/apache/vxquery/xmlquery/query/XMLQueryCompiler.java (original)
+++ incubator/vxquery/trunk/vxquery/vxquery-core/src/main/java/org/apache/vxquery/xmlquery/query/XMLQueryCompiler.java Mon Sep  9 17:32:17 2013
@@ -81,10 +81,10 @@ public class XMLQueryCompiler {
     private Module module;
 
     private ICompiler compiler;
-    
+
     private int frameSize = 65536;
 
-    public XMLQueryCompiler(XQueryCompilationListener listener) {
+    public XMLQueryCompiler(XQueryCompilationListener listener, String[] nodeList) {
         this.listener = listener == null ? NoopXQueryCompilationListener.INSTANCE : listener;
         HeuristicCompilerFactoryBuilder builder = new HeuristicCompilerFactoryBuilder(
                 new IOptimizationContextFactory() {
@@ -138,9 +138,10 @@ public class XMLQueryCompiler {
             }
         });
         builder.setNullWriterFactory(new VXQueryNullWriterFactory());
-        builder.setClusterLocations(new AlgebricksAbsolutePartitionConstraint(new String[] { "nc1" }));
+        builder.setClusterLocations(new AlgebricksAbsolutePartitionConstraint(nodeList));
         cFactory = builder.create();
         mdProvider = new VXQueryMetadataProvider();
+        mdProvider.setNodeList(nodeList);
     }
 
     public void compile(String name, Reader query, CompilerControlBlock ccb, int optimizationLevel)

Modified: incubator/vxquery/trunk/vxquery/vxquery-core/src/main/java/org/apache/vxquery/xmlquery/translator/XMLQueryTranslator.java
URL: http://svn.apache.org/viewvc/incubator/vxquery/trunk/vxquery/vxquery-core/src/main/java/org/apache/vxquery/xmlquery/translator/XMLQueryTranslator.java?rev=1521209&r1=1521208&r2=1521209&view=diff
==============================================================================
--- incubator/vxquery/trunk/vxquery/vxquery-core/src/main/java/org/apache/vxquery/xmlquery/translator/XMLQueryTranslator.java (original)
+++ incubator/vxquery/trunk/vxquery/vxquery-core/src/main/java/org/apache/vxquery/xmlquery/translator/XMLQueryTranslator.java Mon Sep  9 17:32:17 2013
@@ -49,7 +49,7 @@ import org.apache.vxquery.functions.Exte
 import org.apache.vxquery.functions.Function;
 import org.apache.vxquery.functions.Signature;
 import org.apache.vxquery.functions.UserDefinedXQueryFunction;
-import org.apache.vxquery.metadata.QueryResultDataSink;
+import org.apache.vxquery.metadata.QueryResultSetDataSink;
 import org.apache.vxquery.runtime.functions.cast.CastToDecimalOperation;
 import org.apache.vxquery.types.AnyItemType;
 import org.apache.vxquery.types.AnyNodeType;
@@ -168,13 +168,13 @@ import edu.uci.ics.hyracks.algebricks.co
 import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
-import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.WriteOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.plan.ALogicalPlanImpl;
 import edu.uci.ics.hyracks.data.std.primitive.DoublePointable;
 import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
@@ -661,7 +661,8 @@ public class XMLQueryTranslator {
         unnest.getInputs().add(mutable(tCtx.op));
         List<Mutable<ILogicalExpression>> exprs = new ArrayList<Mutable<ILogicalExpression>>();
         exprs.add(mutable(vre(iLVar)));
-        WriteOperator op = new WriteOperator(exprs, new QueryResultDataSink(ccb.getResultFileSplits()));
+        QueryResultSetDataSink sink = new QueryResultSetDataSink(ccb.getResultSetId(), null);
+        DistributeResultOperator op = new DistributeResultOperator(exprs, sink);
         op.getInputs().add(mutable(unnest));
         ALogicalPlanImpl lp = new ALogicalPlanImpl(mutable(op));
 

Modified: incubator/vxquery/trunk/vxquery/vxquery-parent/pom.xml
URL: http://svn.apache.org/viewvc/incubator/vxquery/trunk/vxquery/vxquery-parent/pom.xml?rev=1521209&r1=1521208&r2=1521209&view=diff
==============================================================================
--- incubator/vxquery/trunk/vxquery/vxquery-parent/pom.xml (original)
+++ incubator/vxquery/trunk/vxquery/vxquery-parent/pom.xml Mon Sep  9 17:32:17 2013
@@ -192,6 +192,12 @@
 
     <dependency>
       <groupId>edu.uci.ics.hyracks</groupId>
+      <artifactId>hyracks-client</artifactId>
+      <version>${hyracks.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>edu.uci.ics.hyracks</groupId>
       <artifactId>hyracks-control-cc</artifactId>
       <version>${hyracks.version}</version>
     </dependency>