You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@vxquery.apache.org by pr...@apache.org on 2013/09/09 19:32:18 UTC
svn commit: r1521209 - in /incubator/vxquery/trunk/vxquery:
vxquery-cli/src/main/java/org/apache/vxquery/cli/
vxquery-core/src/main/java/org/apache/vxquery/compiler/
vxquery-core/src/main/java/org/apache/vxquery/metadata/
vxquery-core/src/main/java/org...
Author: prestonc
Date: Mon Sep 9 17:32:17 2013
New Revision: 1521209
URL: http://svn.apache.org/r1521209
Log:
Updated VXQuery to use the DistributedResult operator. The operator allows for multiple node controllers to be used during execution. The CLI will now work with a cluster of node controllers. In addition to cluster support, the CLI has been modified to run multiple node controllers on a local system for testing purposes. A command line option -local-node-controllers can specify the number of nodes and ${nodeId} can be added to the collection function to distinguish between different data partitions.
Added:
incubator/vxquery/trunk/vxquery/vxquery-core/src/main/java/org/apache/vxquery/metadata/QueryResultSetDataSink.java (with props)
Modified:
incubator/vxquery/trunk/vxquery/vxquery-cli/src/main/java/org/apache/vxquery/cli/VXQuery.java
incubator/vxquery/trunk/vxquery/vxquery-core/src/main/java/org/apache/vxquery/compiler/CompilerControlBlock.java
incubator/vxquery/trunk/vxquery/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionOperatorDescriptor.java
incubator/vxquery/trunk/vxquery/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryMetadataProvider.java
incubator/vxquery/trunk/vxquery/vxquery-core/src/main/java/org/apache/vxquery/xmlquery/query/XMLQueryCompiler.java
incubator/vxquery/trunk/vxquery/vxquery-core/src/main/java/org/apache/vxquery/xmlquery/translator/XMLQueryTranslator.java
incubator/vxquery/trunk/vxquery/vxquery-parent/pom.xml
Modified: incubator/vxquery/trunk/vxquery/vxquery-cli/src/main/java/org/apache/vxquery/cli/VXQuery.java
URL: http://svn.apache.org/viewvc/incubator/vxquery/trunk/vxquery/vxquery-cli/src/main/java/org/apache/vxquery/cli/VXQuery.java?rev=1521209&r1=1521208&r2=1521209&view=diff
==============================================================================
--- incubator/vxquery/trunk/vxquery/vxquery-cli/src/main/java/org/apache/vxquery/cli/VXQuery.java (original)
+++ incubator/vxquery/trunk/vxquery/vxquery-cli/src/main/java/org/apache/vxquery/cli/VXQuery.java Mon Sep 9 17:32:17 2013
@@ -16,7 +16,9 @@ package org.apache.vxquery.cli;
import java.io.File;
import java.io.IOException;
+import java.io.PrintWriter;
import java.io.StringReader;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.HashMap;
@@ -47,22 +49,32 @@ import edu.uci.ics.hyracks.algebricks.co
import edu.uci.ics.hyracks.algebricks.core.algebra.prettyprint.PlanPrettyPrinter;
import edu.uci.ics.hyracks.api.client.HyracksConnection;
import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
+import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.dataset.IHyracksDataset;
+import edu.uci.ics.hyracks.api.dataset.IHyracksDatasetReader;
+import edu.uci.ics.hyracks.api.dataset.ResultSetId;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.job.JobFlag;
import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.client.dataset.HyracksDataset;
import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
import edu.uci.ics.hyracks.control.common.controllers.CCConfig;
import edu.uci.ics.hyracks.control.common.controllers.NCConfig;
import edu.uci.ics.hyracks.control.nc.NodeControllerService;
-import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ResultFrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.ByteBufferInputStream;
public class VXQuery {
private final CmdLineOptions opts;
private ClusterControllerService cc;
- private NodeControllerService nc1;
- private NodeControllerService nc2;
+ private NodeControllerService[] ncs;
private IHyracksClientConnection hcc;
+ private IHyracksDataset hds;
+
+ private ResultSetId resultSetId;
public VXQuery(CmdLineOptions opts) {
this.opts = opts;
@@ -157,10 +169,19 @@ public class VXQuery {
}
}
};
- FileSplit[] fileSplits = getFileSplits(opts.outfileSplits);
- XMLQueryCompiler compiler = new XMLQueryCompiler(listener);
- CompilerControlBlock ccb = new CompilerControlBlock(new StaticContextImpl(
- RootStaticContextImpl.INSTANCE), fileSplits);
+
+ // Get cluster node configuration.
+ Map<String, NodeControllerInfo> nodeControllerInfos = hcc.getNodeControllerInfos();
+ String[] nodeList = new String[nodeControllerInfos.size()];
+ int index = 0;
+ for (String node : nodeControllerInfos.keySet()) {
+ nodeList[index++] = node;
+ }
+
+ XMLQueryCompiler compiler = new XMLQueryCompiler(listener, nodeList);
+ resultSetId = createResultSetId();
+ CompilerControlBlock ccb = new CompilerControlBlock(new StaticContextImpl(RootStaticContextImpl.INSTANCE),
+ resultSetId);
compiler.compile(query, new StringReader(qStr), ccb, opts.optimizationLevel);
if (opts.compileOnly) {
continue;
@@ -172,58 +193,39 @@ public class VXQuery {
DynamicContext dCtx = new DynamicContextImpl(module.getModuleContext());
js.setGlobalJobDataFactory(new VXQueryGlobalDataFactory(dCtx.createFactory()));
+ PrintWriter writer = new PrintWriter(System.out, true);
for (int i = 0; i < opts.repeatExec; ++i) {
- runJob(js, fileSplits);
+ runJob(js, writer);
}
}
}
-
- private FileSplit[] getFileSplits(String arg) throws IOException {
- if (arg == null) {
- File result = createTempFile("test");
- return new FileSplit[] {
- new FileSplit("nc1", result.getAbsolutePath())
- };
- } else {
- String[] fileIds = arg.split(",");
- FileSplit[] splits = new FileSplit[fileIds.length];
- for (int i = 0; i < fileIds.length; ++i) {
- String[] components = fileIds[i].split(":");
- System.err.println(components);
- splits[i] = new FileSplit(components[0], components[1]);
- }
- return splits;
+
+ private void runJob(JobSpecification spec, PrintWriter writer) throws Exception {
+ if (hds == null) {
+ hds = new HyracksDataset(hcc, spec.getFrameSize(), 1);
}
- }
- private void runJob(JobSpecification spec, FileSplit[] fileSplits) throws Exception {
JobId jobId = hcc.startJob(spec, EnumSet.of(JobFlag.PROFILE_RUNTIME));
- hcc.waitForCompletion(jobId);
- if (opts.outfileSplits == null) {
- File result = fileSplits[0].getLocalFile().getFile();
- dumpOutputFiles(result);
- } else {
- System.err.println("Results in:");
- for (FileSplit fs : fileSplits) {
- System.out.println(fs.getNodeName() + ":" + fs.getLocalFile().toString());
- }
- }
- }
- private void dumpOutputFiles(File f) throws IOException {
- if (f.exists() && f.isFile()) {
- System.err.println("Reading file: " + f.getAbsolutePath());
- String data = FileUtils.readFileToString(f);
- System.err.println(data);
+ ByteBuffer buffer = ByteBuffer.allocate(spec.getFrameSize());
+ IHyracksDatasetReader reader = hds.createReader(jobId, resultSetId);
+ IFrameTupleAccessor frameTupleAccessor = new ResultFrameTupleAccessor(spec.getFrameSize());
+ buffer.clear();
+
+ while (reader.read(buffer) > 0) {
+ buffer.clear();
+ writer.print(ResultUtils.getStringFromBuffer(buffer, frameTupleAccessor));
+ writer.flush();
}
+
+ hcc.waitForCompletion(jobId);
}
- protected File createTempFile(String name) throws IOException {
- System.err.println("Name: " + name);
- File tempFile = File.createTempFile(name, ".tmp");
- System.err.println("Output file: " + tempFile.getAbsolutePath());
- tempFile.deleteOnExit();
- return tempFile;
+ /**
+ * Create a unique result set id to get the correct query back from the cluster.
+ */
+ protected ResultSetId createResultSetId() {
+ return new ResultSetId(System.nanoTime());
}
public void startLocalHyracks() throws Exception {
@@ -242,32 +244,26 @@ public class VXQuery {
cc = new ClusterControllerService(ccConfig);
cc.start();
- NCConfig ncConfig1 = new NCConfig();
- ncConfig1.ccHost = "localhost";
- ncConfig1.ccPort = 39001;
- ncConfig1.clusterNetIPAddress = "127.0.0.1";
- ncConfig1.dataIPAddress = "127.0.0.1";
- ncConfig1.datasetIPAddress = "127.0.0.1";
- ncConfig1.nodeId = "nc1";
- nc1 = new NodeControllerService(ncConfig1);
- nc1.start();
-
- NCConfig ncConfig2 = new NCConfig();
- ncConfig2.ccHost = "localhost";
- ncConfig2.ccPort = 39001;
- ncConfig2.clusterNetIPAddress = "127.0.0.1";
- ncConfig2.dataIPAddress = "127.0.0.1";
- ncConfig2.datasetIPAddress = "127.0.0.1";
- ncConfig2.nodeId = "nc2";
- nc2 = new NodeControllerService(ncConfig2);
- nc2.start();
+ ncs = new NodeControllerService[opts.localNodeControllers];
+ for (int i = 0; i < ncs.length; i++) {
+ NCConfig ncConfig = new NCConfig();
+ ncConfig.ccHost = "localhost";
+ ncConfig.ccPort = 39001;
+ ncConfig.clusterNetIPAddress = "127.0.0.1";
+ ncConfig.dataIPAddress = "127.0.0.1";
+ ncConfig.datasetIPAddress = "127.0.0.1";
+ ncConfig.nodeId = "nc" + (i + 1);
+ ncs[i] = new NodeControllerService(ncConfig);
+ ncs[i].start();
+ }
hcc = new HyracksConnection(ccConfig.clientNetIpAddress, ccConfig.clientNetPort);
}
public void stopLocalHyracks() throws Exception {
- nc2.stop();
- nc1.stop();
+ for (int i = 0; i < ncs.length; i++) {
+ ncs[i].stop();
+ }
cc.stop();
}
@@ -282,8 +278,8 @@ public class VXQuery {
@Option(name = "-client-net-port", usage = "Port of the ClusterController (default 1098)")
public int clientNetPort = 1098;
- @Option(name = "-outfile-splits", usage = "Output file splits (e.g. \"nc1:/tmp/foo,nc2:/tmp/bar\"")
- public String outfileSplits;
+ @Option(name = "-local-node-controllers", usage = "Number of local node controllers (default 1)")
+ public int localNodeControllers = 1;
@Option(name = "-O", usage = "Optimization Level. Default: Full Optimization")
private int optimizationLevel = Integer.MAX_VALUE;
@@ -318,4 +314,31 @@ public class VXQuery {
@Argument
private List<String> arguments = new ArrayList<String>();
}
+
+ public static class ResultUtils {
+ public static String getStringFromBuffer(ByteBuffer buffer, IFrameTupleAccessor fta)
+ throws HyracksDataException {
+ String resultRecords = "";
+ ByteBufferInputStream bbis = new ByteBufferInputStream();
+ try {
+ fta.reset(buffer);
+ for (int tIndex = 0; tIndex < fta.getTupleCount(); tIndex++) {
+ int start = fta.getTupleStartOffset(tIndex);
+ int length = fta.getTupleEndOffset(tIndex) - start;
+ bbis.setByteBuffer(buffer, start);
+ byte[] recordBytes = new byte[length];
+ bbis.read(recordBytes, 0, length);
+ resultRecords += new String(recordBytes, 0, length);
+ }
+ } finally {
+ try {
+ bbis.close();
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+ return resultRecords;
+ }
+ }
+
}
\ No newline at end of file
Modified: incubator/vxquery/trunk/vxquery/vxquery-core/src/main/java/org/apache/vxquery/compiler/CompilerControlBlock.java
URL: http://svn.apache.org/viewvc/incubator/vxquery/trunk/vxquery/vxquery-core/src/main/java/org/apache/vxquery/compiler/CompilerControlBlock.java?rev=1521209&r1=1521208&r2=1521209&view=diff
==============================================================================
--- incubator/vxquery/trunk/vxquery/vxquery-core/src/main/java/org/apache/vxquery/compiler/CompilerControlBlock.java (original)
+++ incubator/vxquery/trunk/vxquery/vxquery-core/src/main/java/org/apache/vxquery/compiler/CompilerControlBlock.java Mon Sep 9 17:32:17 2013
@@ -18,23 +18,23 @@ package org.apache.vxquery.compiler;
import org.apache.vxquery.context.StaticContext;
-import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
+import edu.uci.ics.hyracks.api.dataset.ResultSetId;
public class CompilerControlBlock {
private final StaticContext ctx;
+
+ private ResultSetId resultSetId;
- private final FileSplit[] resultFileSplits;
-
- public CompilerControlBlock(StaticContext ctx, FileSplit[] resultFileSplits) {
+ public CompilerControlBlock(StaticContext ctx, ResultSetId resultSetId) {
this.ctx = ctx;
- this.resultFileSplits = resultFileSplits;
+ this.resultSetId = resultSetId;
}
public StaticContext getStaticContext() {
return ctx;
}
- public FileSplit[] getResultFileSplits() {
- return resultFileSplits;
+ public ResultSetId getResultSetId() {
+ return resultSetId;
}
}
\ No newline at end of file
Added: incubator/vxquery/trunk/vxquery/vxquery-core/src/main/java/org/apache/vxquery/metadata/QueryResultSetDataSink.java
URL: http://svn.apache.org/viewvc/incubator/vxquery/trunk/vxquery/vxquery-core/src/main/java/org/apache/vxquery/metadata/QueryResultSetDataSink.java?rev=1521209&view=auto
==============================================================================
--- incubator/vxquery/trunk/vxquery/vxquery-core/src/main/java/org/apache/vxquery/metadata/QueryResultSetDataSink.java (added)
+++ incubator/vxquery/trunk/vxquery/vxquery-core/src/main/java/org/apache/vxquery/metadata/QueryResultSetDataSink.java Mon Sep 9 17:32:17 2013
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.vxquery.metadata;
+
+import edu.uci.ics.hyracks.algebricks.core.algebra.metadata.IDataSink;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPartitioningProperty;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.RandomPartitioningProperty;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.ResultSetDomain;
+import edu.uci.ics.hyracks.api.dataset.ResultSetId;
+
+public class QueryResultSetDataSink implements IDataSink {
+
+ private ResultSetId id;
+ private Object[] schemaTypes;
+
+ public QueryResultSetDataSink(ResultSetId id, Object[] schemaTypes) {
+ this.id = id;
+ this.schemaTypes = schemaTypes;
+ }
+
+ @Override
+ public ResultSetId getId() {
+ return id;
+ }
+
+ @Override
+ public Object[] getSchemaTypes() {
+ return schemaTypes;
+ }
+
+ @Override
+ public IPartitioningProperty getPartitioningProperty() {
+ return new RandomPartitioningProperty(new ResultSetDomain());
+ }
+}
\ No newline at end of file
Propchange: incubator/vxquery/trunk/vxquery/vxquery-core/src/main/java/org/apache/vxquery/metadata/QueryResultSetDataSink.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: incubator/vxquery/trunk/vxquery/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionOperatorDescriptor.java
URL: http://svn.apache.org/viewvc/incubator/vxquery/trunk/vxquery/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionOperatorDescriptor.java?rev=1521209&r1=1521208&r2=1521209&view=diff
==============================================================================
--- incubator/vxquery/trunk/vxquery/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionOperatorDescriptor.java (original)
+++ incubator/vxquery/trunk/vxquery/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionOperatorDescriptor.java Mon Sep 9 17:32:17 2013
@@ -66,6 +66,7 @@ public class VXQueryCollectionOperatorDe
final ArrayBackedValueStorage abvsFileNode = new ArrayBackedValueStorage();
final short partitionId = (short) ctx.getTaskAttemptId().getTaskId().getPartition();
final ITreeNodeIdProvider nodeIdProvider = new TreeNodeIdProvider(partitionId, dataSourceId, totalDataSources);
+ final String nodeId = ctx.getJobletContext().getApplicationContext().getNodeId();
return new AbstractUnaryInputUnaryOutputOperatorNodePushable() {
@Override
@@ -77,7 +78,8 @@ public class VXQueryCollectionOperatorDe
@Override
public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
fta.reset(buffer);
- File collectionDirectory = new File(collectionName);
+ String collectionModifiedName = collectionName.replace("${nodeId}", nodeId);
+ File collectionDirectory = new File(collectionModifiedName);
// Go through each tuple.
for (int t = 0; t < fta.getTupleCount(); ++t) {
Modified: incubator/vxquery/trunk/vxquery/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryMetadataProvider.java
URL: http://svn.apache.org/viewvc/incubator/vxquery/trunk/vxquery/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryMetadataProvider.java?rev=1521209&r1=1521208&r2=1521209&view=diff
==============================================================================
--- incubator/vxquery/trunk/vxquery/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryMetadataProvider.java (original)
+++ incubator/vxquery/trunk/vxquery/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryMetadataProvider.java Mon Sep 9 17:32:17 2013
@@ -16,6 +16,7 @@
*/
package org.apache.vxquery.metadata;
+import java.io.IOException;
import java.util.List;
import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
@@ -33,17 +34,29 @@ import edu.uci.ics.hyracks.algebricks.co
import edu.uci.ics.hyracks.algebricks.core.algebra.metadata.IMetadataProvider;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import edu.uci.ics.hyracks.algebricks.data.IAWriterFactory;
import edu.uci.ics.hyracks.algebricks.data.IPrinterFactory;
+import edu.uci.ics.hyracks.algebricks.data.IResultSerializerFactoryProvider;
import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
import edu.uci.ics.hyracks.algebricks.runtime.operators.std.SinkWriterRuntimeFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.serializer.ResultSerializerFactoryProvider;
import edu.uci.ics.hyracks.algebricks.runtime.writers.PrinterBasedWriterFactory;
import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.value.IResultSerializerFactory;
import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.dataset.ResultSetId;
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
+import edu.uci.ics.hyracks.dataflow.std.result.ResultWriterOperatorDescriptor;
public class VXQueryMetadataProvider implements IMetadataProvider<String, String> {
+ String[] nodeList;
+
+ public void setNodeList(String[] nodeList) {
+ this.nodeList = nodeList;
+ }
+
@Override
public IDataSource<String> findDataSource(String id) throws AlgebricksException {
return null;
@@ -59,10 +72,7 @@ public class VXQueryMetadataProvider imp
IOperatorDescriptor scanner = new VXQueryCollectionOperatorDescriptor(jobSpec, ds.getId(),
ds.getDataSourceId(), ds.getTotalDataSources(), rDesc);
- // TODO review if locations needs to be updated for parallel processing.
- String[] locations = new String[1];
- locations[0] = "nc1";
- AlgebricksAbsolutePartitionConstraint constraint = new AlgebricksAbsolutePartitionConstraint(locations);
+ AlgebricksAbsolutePartitionConstraint constraint = new AlgebricksAbsolutePartitionConstraint(nodeList);
return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(scanner, constraint);
}
@@ -143,7 +153,23 @@ public class VXQueryMetadataProvider imp
public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getResultHandleRuntime(IDataSink sink,
int[] printColumns, IPrinterFactory[] printerFactories, RecordDescriptor inputDesc, boolean ordered,
JobSpecification spec) throws AlgebricksException {
- // TODO Auto-generated method stub
- return null;
+ QueryResultSetDataSink rsds = (QueryResultSetDataSink) sink;
+ ResultSetId rssId = (ResultSetId) rsds.getId();
+
+ IResultSerializerFactoryProvider resultSerializerFactoryProvider = ResultSerializerFactoryProvider.INSTANCE;
+ IAWriterFactory writerFactory = PrinterBasedWriterFactory.INSTANCE;
+
+ ResultWriterOperatorDescriptor resultWriter = null;
+ try {
+ IResultSerializerFactory resultSerializedAppenderFactory = resultSerializerFactoryProvider
+ .getAqlResultSerializerFactoryProvider(printColumns, printerFactories, writerFactory);
+ resultWriter = new ResultWriterOperatorDescriptor(spec, rssId, ordered, false,
+ resultSerializedAppenderFactory);
+ } catch (IOException e) {
+ throw new AlgebricksException(e);
+ }
+
+ return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(resultWriter, null);
}
+
}
\ No newline at end of file
Modified: incubator/vxquery/trunk/vxquery/vxquery-core/src/main/java/org/apache/vxquery/xmlquery/query/XMLQueryCompiler.java
URL: http://svn.apache.org/viewvc/incubator/vxquery/trunk/vxquery/vxquery-core/src/main/java/org/apache/vxquery/xmlquery/query/XMLQueryCompiler.java?rev=1521209&r1=1521208&r2=1521209&view=diff
==============================================================================
--- incubator/vxquery/trunk/vxquery/vxquery-core/src/main/java/org/apache/vxquery/xmlquery/query/XMLQueryCompiler.java (original)
+++ incubator/vxquery/trunk/vxquery/vxquery-core/src/main/java/org/apache/vxquery/xmlquery/query/XMLQueryCompiler.java Mon Sep 9 17:32:17 2013
@@ -81,10 +81,10 @@ public class XMLQueryCompiler {
private Module module;
private ICompiler compiler;
-
+
private int frameSize = 65536;
- public XMLQueryCompiler(XQueryCompilationListener listener) {
+ public XMLQueryCompiler(XQueryCompilationListener listener, String[] nodeList) {
this.listener = listener == null ? NoopXQueryCompilationListener.INSTANCE : listener;
HeuristicCompilerFactoryBuilder builder = new HeuristicCompilerFactoryBuilder(
new IOptimizationContextFactory() {
@@ -138,9 +138,10 @@ public class XMLQueryCompiler {
}
});
builder.setNullWriterFactory(new VXQueryNullWriterFactory());
- builder.setClusterLocations(new AlgebricksAbsolutePartitionConstraint(new String[] { "nc1" }));
+ builder.setClusterLocations(new AlgebricksAbsolutePartitionConstraint(nodeList));
cFactory = builder.create();
mdProvider = new VXQueryMetadataProvider();
+ mdProvider.setNodeList(nodeList);
}
public void compile(String name, Reader query, CompilerControlBlock ccb, int optimizationLevel)
Modified: incubator/vxquery/trunk/vxquery/vxquery-core/src/main/java/org/apache/vxquery/xmlquery/translator/XMLQueryTranslator.java
URL: http://svn.apache.org/viewvc/incubator/vxquery/trunk/vxquery/vxquery-core/src/main/java/org/apache/vxquery/xmlquery/translator/XMLQueryTranslator.java?rev=1521209&r1=1521208&r2=1521209&view=diff
==============================================================================
--- incubator/vxquery/trunk/vxquery/vxquery-core/src/main/java/org/apache/vxquery/xmlquery/translator/XMLQueryTranslator.java (original)
+++ incubator/vxquery/trunk/vxquery/vxquery-core/src/main/java/org/apache/vxquery/xmlquery/translator/XMLQueryTranslator.java Mon Sep 9 17:32:17 2013
@@ -49,7 +49,7 @@ import org.apache.vxquery.functions.Exte
import org.apache.vxquery.functions.Function;
import org.apache.vxquery.functions.Signature;
import org.apache.vxquery.functions.UserDefinedXQueryFunction;
-import org.apache.vxquery.metadata.QueryResultDataSink;
+import org.apache.vxquery.metadata.QueryResultSetDataSink;
import org.apache.vxquery.runtime.functions.cast.CastToDecimalOperation;
import org.apache.vxquery.types.AnyItemType;
import org.apache.vxquery.types.AnyNodeType;
@@ -168,13 +168,13 @@ import edu.uci.ics.hyracks.algebricks.co
import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
-import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.WriteOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.plan.ALogicalPlanImpl;
import edu.uci.ics.hyracks.data.std.primitive.DoublePointable;
import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
@@ -661,7 +661,8 @@ public class XMLQueryTranslator {
unnest.getInputs().add(mutable(tCtx.op));
List<Mutable<ILogicalExpression>> exprs = new ArrayList<Mutable<ILogicalExpression>>();
exprs.add(mutable(vre(iLVar)));
- WriteOperator op = new WriteOperator(exprs, new QueryResultDataSink(ccb.getResultFileSplits()));
+ QueryResultSetDataSink sink = new QueryResultSetDataSink(ccb.getResultSetId(), null);
+ DistributeResultOperator op = new DistributeResultOperator(exprs, sink);
op.getInputs().add(mutable(unnest));
ALogicalPlanImpl lp = new ALogicalPlanImpl(mutable(op));
Modified: incubator/vxquery/trunk/vxquery/vxquery-parent/pom.xml
URL: http://svn.apache.org/viewvc/incubator/vxquery/trunk/vxquery/vxquery-parent/pom.xml?rev=1521209&r1=1521208&r2=1521209&view=diff
==============================================================================
--- incubator/vxquery/trunk/vxquery/vxquery-parent/pom.xml (original)
+++ incubator/vxquery/trunk/vxquery/vxquery-parent/pom.xml Mon Sep 9 17:32:17 2013
@@ -192,6 +192,12 @@
<dependency>
<groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-client</artifactId>
+ <version>${hyracks.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
<artifactId>hyracks-control-cc</artifactId>
<version>${hyracks.version}</version>
</dependency>