You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@vxquery.apache.org by ti...@apache.org on 2018/10/16 02:15:22 UTC
vxquery git commit: Move to Hyracks 0.3.3
Repository: vxquery
Updated Branches:
refs/heads/master 33b3b79e3 -> 5d1175d2c
Move to Hyracks 0.3.3
Project: http://git-wip-us.apache.org/repos/asf/vxquery/repo
Commit: http://git-wip-us.apache.org/repos/asf/vxquery/commit/5d1175d2
Tree: http://git-wip-us.apache.org/repos/asf/vxquery/tree/5d1175d2
Diff: http://git-wip-us.apache.org/repos/asf/vxquery/diff/5d1175d2
Branch: refs/heads/master
Commit: 5d1175d2cb04a54ba751295f2ac67daec38bf723
Parents: 33b3b79
Author: Till Westmann <ti...@apache.org>
Authored: Sun Jul 1 11:28:11 2018 -0700
Committer: Till Westmann <ti...@apache.org>
Committed: Mon Oct 15 19:11:31 2018 -0700
----------------------------------------------------------------------
pom.xml | 4 +-
.../VXQueryComparatorFactoryProvider.java | 10 ++-
.../rules/IntroduceTwoStepAggregateRule.java | 45 ++++++++------
.../VXQueryCollectionOperatorDescriptor.java | 7 ++-
.../VXQueryIndexingOperatorDescriptor.java | 6 +-
.../ShowIndexesScalarEvaluatorFactory.java | 2 +-
.../FnDocAvailableScalarEvaluatorFactory.java | 2 +-
.../node/FnDocScalarEvaluatorFactory.java | 2 +-
.../apache/vxquery/app/VXQueryApplication.java | 33 ++++++++--
.../vxquery/app/util/LocalClusterUtil.java | 65 +++++++-------------
.../vxquery/rest/request/QueryRequest.java | 2 +-
.../vxquery/rest/service/VXQueryService.java | 6 +-
.../apache/vxquery/xtest/TestClusterUtil.java | 36 ++---------
.../java/org/apache/vxquery/xtest/XTest.java | 2 +-
.../vxquery/xtest/AbstractXQueryTest.java | 4 +-
15 files changed, 111 insertions(+), 115 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/vxquery/blob/5d1175d2/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 8b634f0..39a90e0 100644
--- a/pom.xml
+++ b/pom.xml
@@ -236,7 +236,7 @@
<dependency>
<groupId>args4j</groupId>
<artifactId>args4j</artifactId>
- <version>2.0.9</version>
+ <version>2.33</version>
</dependency>
<dependency>
@@ -762,7 +762,7 @@
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<hyracks.fullstack.version>0.3.1</hyracks.fullstack.version>
- <hyracks.version>0.3.0</hyracks.version>
+ <hyracks.version>0.3.3</hyracks.version>
<lucene.version>5.5.1</lucene.version>
<hadoop.version>2.7.0</hadoop.version>
<apache-rat-plugin.version>0.11</apache-rat-plugin.version>
http://git-wip-us.apache.org/repos/asf/vxquery/blob/5d1175d2/vxquery-core/src/main/java/org/apache/vxquery/compiler/algebricks/VXQueryComparatorFactoryProvider.java
----------------------------------------------------------------------
diff --git a/vxquery-core/src/main/java/org/apache/vxquery/compiler/algebricks/VXQueryComparatorFactoryProvider.java b/vxquery-core/src/main/java/org/apache/vxquery/compiler/algebricks/VXQueryComparatorFactoryProvider.java
index b7196cf..a510e1c 100644
--- a/vxquery-core/src/main/java/org/apache/vxquery/compiler/algebricks/VXQueryComparatorFactoryProvider.java
+++ b/vxquery-core/src/main/java/org/apache/vxquery/compiler/algebricks/VXQueryComparatorFactoryProvider.java
@@ -16,12 +16,12 @@
*/
package org.apache.vxquery.compiler.algebricks;
-import org.apache.vxquery.datamodel.accessors.TaggedValuePointable;
-
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.common.exceptions.NotImplementedException;
import org.apache.hyracks.algebricks.data.IBinaryComparatorFactoryProvider;
import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import org.apache.vxquery.datamodel.accessors.TaggedValuePointable;
public class VXQueryComparatorFactoryProvider implements IBinaryComparatorFactoryProvider {
@Override
@@ -30,6 +30,12 @@ public class VXQueryComparatorFactoryProvider implements IBinaryComparatorFactor
return new BinaryComparatorFactory(type, ascending);
}
+ @Override
+ public IBinaryComparatorFactory getBinaryComparatorFactory(Object type, boolean ascending, boolean ignoreCase)
+ throws AlgebricksException {
+ throw new NotImplementedException();
+ }
+
private static class BinaryComparatorFactory implements IBinaryComparatorFactory {
private static final long serialVersionUID = 1L;
http://git-wip-us.apache.org/repos/asf/vxquery/blob/5d1175d2/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/IntroduceTwoStepAggregateRule.java
----------------------------------------------------------------------
diff --git a/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/IntroduceTwoStepAggregateRule.java b/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/IntroduceTwoStepAggregateRule.java
index 806b532..962b851 100644
--- a/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/IntroduceTwoStepAggregateRule.java
+++ b/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/IntroduceTwoStepAggregateRule.java
@@ -17,12 +17,10 @@
package org.apache.vxquery.compiler.rewriter.rules;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import org.apache.commons.lang3.mutable.Mutable;
-import org.apache.vxquery.functions.BuiltinFunctions;
-import org.apache.vxquery.functions.BuiltinOperators;
-
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.common.utils.Pair;
import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
@@ -37,6 +35,8 @@ import org.apache.hyracks.algebricks.core.algebra.functions.IFunctionInfo;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
+import org.apache.vxquery.functions.BuiltinFunctions;
+import org.apache.vxquery.functions.BuiltinOperators;
/**
* The rule searches for aggregate operators with an aggregate function
@@ -91,28 +91,37 @@ public class IntroduceTwoStepAggregateRule implements IAlgebraicRewriteRule {
if (op.getOperatorTag() != LogicalOperatorTag.AGGREGATE) {
return false;
}
- AggregateOperator aggregate = (AggregateOperator) op;
- if (aggregate.getExpressions().size() == 0) {
+ final AggregateOperator aggregate = (AggregateOperator) op;
+ AggregateFunctionCallExpression aggregateFunctionCall = getAggregateFunctionCall(aggregate);
+ if (aggregateFunctionCall == null || aggregateFunctionCall.isTwoStep()) {
return false;
}
- Mutable<ILogicalExpression> mutableLogicalExpression = aggregate.getExpressions().get(0);
- ILogicalExpression logicalExpression = mutableLogicalExpression.getValue();
+ // Replace single step aggregate function with two step function call
+ final IFunctionInfo functionInfo = aggregateFunctionCall.getFunctionInfo();
+ final List<Mutable<ILogicalExpression>> arguments = aggregateFunctionCall.getArguments();
+ AggregateFunctionCallExpression twoStepCall =
+ new AggregateFunctionCallExpression(functionInfo, true, arguments);
+ final Pair<IFunctionInfo, IFunctionInfo> functionInfoPair =
+ AGGREGATE_MAP.get(aggregateFunctionCall.getFunctionIdentifier());
+ twoStepCall.setStepOneAggregate(functionInfoPair.first);
+ twoStepCall.setStepTwoAggregate(functionInfoPair.second);
+ aggregate.getExpressions().get(0).setValue(twoStepCall);
+ return true;
+ }
+
+ private AggregateFunctionCallExpression getAggregateFunctionCall(AggregateOperator aggregate) {
+ if (aggregate.getExpressions().size() == 0) {
+ return null;
+ }
+ ILogicalExpression logicalExpression = aggregate.getExpressions().get(0).getValue();
if (logicalExpression.getExpressionTag() != LogicalExpressionTag.FUNCTION_CALL) {
- return false;
+ return null;
}
AbstractFunctionCallExpression functionCall = (AbstractFunctionCallExpression) logicalExpression;
-
if (AGGREGATE_MAP.containsKey(functionCall.getFunctionIdentifier())) {
- AggregateFunctionCallExpression aggregateFunctionCall = (AggregateFunctionCallExpression) functionCall;
- if (aggregateFunctionCall.isTwoStep()) {
- return false;
- }
- aggregateFunctionCall.setTwoStep(true);
- aggregateFunctionCall.setStepOneAggregate(AGGREGATE_MAP.get(functionCall.getFunctionIdentifier()).first);
- aggregateFunctionCall.setStepTwoAggregate(AGGREGATE_MAP.get(functionCall.getFunctionIdentifier()).second);
- return true;
+ return (AggregateFunctionCallExpression) functionCall;
}
- return false;
+ return null;
}
@Override
http://git-wip-us.apache.org/repos/asf/vxquery/blob/5d1175d2/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionOperatorDescriptor.java b/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionOperatorDescriptor.java
index a3756d5..5ae5ed7 100644
--- a/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionOperatorDescriptor.java
+++ b/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionOperatorDescriptor.java
@@ -23,6 +23,7 @@ import java.io.FileNotFoundException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.Reader;
+import java.net.Inet4Address;
import java.net.InetAddress;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
@@ -92,7 +93,7 @@ public class VXQueryCollectionOperatorDescriptor extends AbstractSingleActivityO
totalDataSources = (short) ds.getTotalDataSources();
childSeq = ds.getChildSeq();
valueSeq = ds.getValueSeq();
- recordDescriptors[0] = rDesc;
+ outRecDescs[0] = rDesc;
this.tag = ds.getTag();
this.hdfsConf = hdfsConf;
this.nodeControllerInfos = nodeControllerInfos;
@@ -108,7 +109,7 @@ public class VXQueryCollectionOperatorDescriptor extends AbstractSingleActivityO
final IFrameFieldAppender appender = new FrameFixedFieldTupleAppender(fieldOutputCount);
final short partitionId = (short) ctx.getTaskAttemptId().getTaskId().getPartition();
final ITreeNodeIdProvider nodeIdProvider = new TreeNodeIdProvider(partitionId, dataSourceId, totalDataSources);
- final String nodeId = ctx.getJobletContext().getApplicationContext().getNodeId();
+ final String nodeId = ctx.getJobletContext().getServiceContext().getNodeId();
final DynamicContext dCtx = (DynamicContext) ctx.getJobletContext().getGlobalJobData();
final ArrayBackedValueStorage jsonAbvs = new ArrayBackedValueStorage();
final String collectionName = collectionPartitions[partition % collectionPartitions.length];
@@ -157,7 +158,7 @@ public class VXQueryCollectionOperatorDescriptor extends AbstractSingleActivityO
try {
hdfs.scheduleSplits();
ArrayList<Integer> schedule = hdfs
- .getScheduleForNode(InetAddress.getLocalHost().getHostAddress());
+ .getScheduleForNode(Inet4Address.getLoopbackAddress().getHostAddress());
List<InputSplit> splits = hdfs.getSplits();
List<FileSplit> fileSplits = new ArrayList<>();
for (int i : schedule) {
http://git-wip-us.apache.org/repos/asf/vxquery/blob/5d1175d2/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryIndexingOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryIndexingOperatorDescriptor.java b/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryIndexingOperatorDescriptor.java
index 9353319..c26547e 100644
--- a/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryIndexingOperatorDescriptor.java
+++ b/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryIndexingOperatorDescriptor.java
@@ -69,7 +69,7 @@ public class VXQueryIndexingOperatorDescriptor extends AbstractSingleActivityOpe
collectionPartitions = ds.getPartitions();
dataSourceId = (short) ds.getDataSourceId();
totalDataSources = (short) ds.getTotalDataSources();
- recordDescriptors[0] = rDesc;
+ outRecDescs[0] = rDesc;
childSeq = ds.getChildSeq();
indexChildSeq = ds.getIndexChildSeq();
indexAttsSeq = ds.getIndexAttsSeq();
@@ -86,11 +86,11 @@ public class VXQueryIndexingOperatorDescriptor extends AbstractSingleActivityOpe
final IFrameFieldAppender appender = new FrameFixedFieldTupleAppender(fieldOutputCount);
final short partitionId = (short) ctx.getTaskAttemptId().getTaskId().getPartition();
final ITreeNodeIdProvider nodeIdProvider = new TreeNodeIdProvider(partitionId, dataSourceId, totalDataSources);
- final String nodeId = ctx.getJobletContext().getApplicationContext().getNodeId();
+ final String nodeId = ctx.getJobletContext().getServiceContext().getNodeId();
final String collectionName = collectionPartitions[partition % collectionPartitions.length];
final String collectionModifiedName = collectionName.replace("${nodeId}", nodeId);
IndexCentralizerUtil indexCentralizerUtil = new IndexCentralizerUtil(
- ctx.getIOManager().getIODevices().get(0).getMount());
+ ctx.getIoManager().getIODevices().get(0).getMount());
indexCentralizerUtil.readIndexDirectory();
final IPointable result = (TaggedValuePointable) TaggedValuePointable.FACTORY.createPointable();
http://git-wip-us.apache.org/repos/asf/vxquery/blob/5d1175d2/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/index/ShowIndexesScalarEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/index/ShowIndexesScalarEvaluatorFactory.java b/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/index/ShowIndexesScalarEvaluatorFactory.java
index 6b18b33..6004d79 100644
--- a/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/index/ShowIndexesScalarEvaluatorFactory.java
+++ b/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/index/ShowIndexesScalarEvaluatorFactory.java
@@ -52,7 +52,7 @@ public class ShowIndexesScalarEvaluatorFactory extends AbstractTaggedValueArgume
abvs.reset();
sb.reset(abvs);
IndexCentralizerUtil indexCentralizerUtil = new IndexCentralizerUtil(
- ctx.getIOManager().getIODevices().get(0).getMount());
+ ctx.getIoManager().getIODevices().get(0).getMount());
indexCentralizerUtil.readIndexDirectory();
indexCentralizerUtil.getAllCollections(sb);
sb.finish();
http://git-wip-us.apache.org/repos/asf/vxquery/blob/5d1175d2/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/node/FnDocAvailableScalarEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/node/FnDocAvailableScalarEvaluatorFactory.java b/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/node/FnDocAvailableScalarEvaluatorFactory.java
index 15fd624..6d63d5f 100644
--- a/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/node/FnDocAvailableScalarEvaluatorFactory.java
+++ b/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/node/FnDocAvailableScalarEvaluatorFactory.java
@@ -57,7 +57,7 @@ public class FnDocAvailableScalarEvaluatorFactory extends AbstractTaggedValueArg
final DataInputStream di = new DataInputStream(bbis);
final int partition = ctx.getTaskAttemptId().getTaskId().getPartition();
final ITreeNodeIdProvider nodeIdProvider = new TreeNodeIdProvider((short) partition);
- final String nodeId = ctx.getJobletContext().getApplicationContext().getNodeId();
+ final String nodeId = ctx.getJobletContext().getServiceContext().getNodeId();
return new AbstractTaggedValueArgumentScalarEvaluator(args) {
@Override
http://git-wip-us.apache.org/repos/asf/vxquery/blob/5d1175d2/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/node/FnDocScalarEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/node/FnDocScalarEvaluatorFactory.java b/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/node/FnDocScalarEvaluatorFactory.java
index 2fd1755..e3157af 100644
--- a/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/node/FnDocScalarEvaluatorFactory.java
+++ b/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/node/FnDocScalarEvaluatorFactory.java
@@ -57,7 +57,7 @@ public class FnDocScalarEvaluatorFactory extends AbstractTaggedValueArgumentScal
final DataInputStream di = new DataInputStream(bbis);
final int partition = ctx.getTaskAttemptId().getTaskId().getPartition();
final ITreeNodeIdProvider nodeIdProvider = new TreeNodeIdProvider((short) partition);
- final String nodeId = ctx.getJobletContext().getApplicationContext().getNodeId();
+ final String nodeId = ctx.getJobletContext().getServiceContext().getNodeId();
return new AbstractTaggedValueArgumentScalarEvaluator(args) {
@Override
http://git-wip-us.apache.org/repos/asf/vxquery/blob/5d1175d2/vxquery-rest/src/main/java/org/apache/vxquery/app/VXQueryApplication.java
----------------------------------------------------------------------
diff --git a/vxquery-rest/src/main/java/org/apache/vxquery/app/VXQueryApplication.java b/vxquery-rest/src/main/java/org/apache/vxquery/app/VXQueryApplication.java
index f5e0165..e2ca1b7 100644
--- a/vxquery-rest/src/main/java/org/apache/vxquery/app/VXQueryApplication.java
+++ b/vxquery-rest/src/main/java/org/apache/vxquery/app/VXQueryApplication.java
@@ -28,9 +28,13 @@ import java.io.InputStream;
import java.util.logging.Level;
import java.util.logging.Logger;
-import org.apache.hyracks.api.application.ICCApplicationContext;
-import org.apache.hyracks.api.application.ICCApplicationEntryPoint;
+import org.apache.hyracks.api.application.ICCApplication;
+import org.apache.hyracks.api.application.ICCServiceContext;
+import org.apache.hyracks.api.application.IServiceContext;
import org.apache.hyracks.api.client.ClusterControllerInfo;
+import org.apache.hyracks.api.config.IConfigManager;
+import org.apache.hyracks.api.job.resource.DefaultJobCapacityController;
+import org.apache.hyracks.api.job.resource.IJobCapacityController;
import org.apache.vxquery.exceptions.VXQueryRuntimeException;
import org.apache.vxquery.rest.RestServer;
import org.apache.vxquery.rest.service.VXQueryConfig;
@@ -44,15 +48,20 @@ import org.kohsuke.args4j.Option;
*
* @author Erandi Ganepola
*/
-public class VXQueryApplication implements ICCApplicationEntryPoint {
+public class VXQueryApplication implements ICCApplication {
private static final Logger LOGGER = Logger.getLogger(VXQueryApplication.class.getName());
private VXQueryService vxQueryService;
private RestServer restServer;
+ private ICCServiceContext ccAppCtx;
+
+ public void init(IServiceContext serviceCtx) throws Exception {
+ ccAppCtx = (ICCServiceContext)serviceCtx;
+ }
@Override
- public void start(ICCApplicationContext ccAppCtx, String[] args) throws Exception {
+ public void start(String[] args) throws Exception {
AppArgs appArgs = new AppArgs();
if (args != null) {
CmdLineParser parser = new CmdLineParser(appArgs);
@@ -98,6 +107,22 @@ public class VXQueryApplication implements ICCApplicationEntryPoint {
}
}
+
+ @Override
+ public Object getApplicationContext() {
+ return ccAppCtx;
+ }
+
+ @Override
+ public void registerConfig(IConfigManager configManager) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public IJobCapacityController getJobCapacityController() {
+ return DefaultJobCapacityController.INSTANCE;
+ }
+
/**
* Loads properties from
*
http://git-wip-us.apache.org/repos/asf/vxquery/blob/5d1175d2/vxquery-rest/src/main/java/org/apache/vxquery/app/util/LocalClusterUtil.java
----------------------------------------------------------------------
diff --git a/vxquery-rest/src/main/java/org/apache/vxquery/app/util/LocalClusterUtil.java b/vxquery-rest/src/main/java/org/apache/vxquery/app/util/LocalClusterUtil.java
index cd149dd..998563c 100644
--- a/vxquery-rest/src/main/java/org/apache/vxquery/app/util/LocalClusterUtil.java
+++ b/vxquery-rest/src/main/java/org/apache/vxquery/app/util/LocalClusterUtil.java
@@ -24,7 +24,7 @@ import static org.apache.vxquery.rest.Constants.Properties.JOIN_HASH_SIZE;
import static org.apache.vxquery.rest.Constants.Properties.MAXIMUM_DATA_SIZE;
import java.io.IOException;
-import java.net.InetAddress;
+import java.net.Inet4Address;
import java.net.UnknownHostException;
import java.nio.file.Files;
import java.util.Arrays;
@@ -59,8 +59,6 @@ public class LocalClusterUtil {
private ClusterControllerService clusterControllerService;
private NodeControllerService nodeControllerSerivce;
- private IHyracksClientConnection hcc;
- private IHyracksDataset hds;
private VXQueryService vxQueryService;
public void init(VXQueryConfig config) throws Exception {
@@ -77,19 +75,14 @@ public class LocalClusterUtil {
clusterControllerService = new ClusterControllerService(ccConfig);
clusterControllerService.start();
- hcc = new HyracksConnection(ccConfig.clientNetIpAddress, ccConfig.clientNetPort);
- hds = new HyracksDataset(hcc, config.getFrameSize(), config.getAvailableProcessors());
-
// Node controller
NCConfig ncConfig = createNCConfig();
nodeControllerSerivce = new NodeControllerService(ncConfig);
nodeControllerSerivce.start();
- hcc = new HyracksConnection(ccConfig.clientNetIpAddress, ccConfig.clientNetPort);
-
// REST controller
- config.setHyracksClientIp(ccConfig.clientNetIpAddress);
- config.setHyracksClientPort(ccConfig.clientNetPort);
+ config.setHyracksClientIp(ccConfig.getClientListenAddress());
+ config.setHyracksClientPort(ccConfig.getClientListenPort());
vxQueryService = new VXQueryService(config);
vxQueryService.start();
}
@@ -97,35 +90,30 @@ public class LocalClusterUtil {
protected CCConfig createCCConfig() throws IOException {
String localAddress = getIpAddress();
CCConfig ccConfig = new CCConfig();
- ccConfig.clientNetIpAddress = localAddress;
- ccConfig.clientNetPort = DEFAULT_HYRACKS_CC_CLIENT_PORT;
- ccConfig.clusterNetIpAddress = localAddress;
- ccConfig.clusterNetPort = DEFAULT_HYRACKS_CC_CLUSTER_PORT;
- ccConfig.httpPort = DEFAULT_HYRACKS_CC_HTTP_PORT;
- ccConfig.profileDumpPeriod = 10000;
- ccConfig.appCCMainClass = VXQueryApplication.class.getName();
- ccConfig.appArgs = Arrays.asList("-restPort", String.valueOf(DEFAULT_VXQUERY_REST_PORT));
-
+ ccConfig.setClientListenAddress(localAddress);
+ ccConfig.setClientListenPort(DEFAULT_HYRACKS_CC_CLIENT_PORT);
+ ccConfig.setClusterListenAddress(localAddress);
+ ccConfig.setClusterListenPort(DEFAULT_HYRACKS_CC_CLUSTER_PORT);
+ ccConfig.setConsoleListenPort(DEFAULT_HYRACKS_CC_HTTP_PORT);
+ ccConfig.setProfileDumpPeriod(10000);
+ ccConfig.setAppClass(VXQueryApplication.class.getName());
+ ccConfig.getAppArgs().addAll(Arrays.asList("-restPort", String.valueOf(DEFAULT_VXQUERY_REST_PORT)));
return ccConfig;
}
protected NCConfig createNCConfig() throws IOException {
String localAddress = getIpAddress();
- NCConfig ncConfig = new NCConfig();
- ncConfig.ccHost = "localhost";
- ncConfig.ccPort = DEFAULT_HYRACKS_CC_CLUSTER_PORT;
- ncConfig.clusterNetIPAddress = localAddress;
- ncConfig.dataIPAddress = localAddress;
- ncConfig.resultIPAddress = localAddress;
- ncConfig.nodeId = "test_node";
- ncConfig.ioDevices = Files.createTempDirectory(ncConfig.nodeId).toString();
+ String nodeId = "test_node";
+ NCConfig ncConfig = new NCConfig(nodeId);
+ ncConfig.setClusterAddress("localhost");
+ ncConfig.setClusterPort(DEFAULT_HYRACKS_CC_CLUSTER_PORT);
+ ncConfig.setClusterListenAddress(localAddress);
+ ncConfig.setDataListenAddress(localAddress);
+ ncConfig.setResultListenAddress(localAddress);
+ ncConfig.setIODevices(new String[] { Files.createTempDirectory(nodeId).toString() });
+ ncConfig.setVirtualNC();
return ncConfig;
}
-
- public IHyracksClientConnection getHyracksClientConnection() {
- return hcc;
- }
-
public VXQueryService getVxQueryService() {
return vxQueryService;
}
@@ -166,21 +154,10 @@ public class LocalClusterUtil {
}
public String getIpAddress() throws UnknownHostException {
- return InetAddress.getLocalHost().getHostAddress();
+ return Inet4Address.getLoopbackAddress().getHostAddress();
}
public int getRestPort() {
return DEFAULT_VXQUERY_REST_PORT;
}
-
- @Deprecated
- public IHyracksClientConnection getConnection() {
- return hcc;
- }
-
- @Deprecated
- public IHyracksDataset getDataset() {
- return hds;
- }
-
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/vxquery/blob/5d1175d2/vxquery-rest/src/main/java/org/apache/vxquery/rest/request/QueryRequest.java
----------------------------------------------------------------------
diff --git a/vxquery-rest/src/main/java/org/apache/vxquery/rest/request/QueryRequest.java b/vxquery-rest/src/main/java/org/apache/vxquery/rest/request/QueryRequest.java
index a88ae1c..6c3a25d 100644
--- a/vxquery-rest/src/main/java/org/apache/vxquery/rest/request/QueryRequest.java
+++ b/vxquery-rest/src/main/java/org/apache/vxquery/rest/request/QueryRequest.java
@@ -140,7 +140,7 @@ public class QueryRequest {
}
public String toString() {
- return String.format("{ statement : %s }", statement);
+ return String.format("{ statement : \"%s\" }", statement);
}
public String getRequestId() {
http://git-wip-us.apache.org/repos/asf/vxquery/blob/5d1175d2/vxquery-rest/src/main/java/org/apache/vxquery/rest/service/VXQueryService.java
----------------------------------------------------------------------
diff --git a/vxquery-rest/src/main/java/org/apache/vxquery/rest/service/VXQueryService.java b/vxquery-rest/src/main/java/org/apache/vxquery/rest/service/VXQueryService.java
index 1d51b6a..884abf4 100644
--- a/vxquery-rest/src/main/java/org/apache/vxquery/rest/service/VXQueryService.java
+++ b/vxquery-rest/src/main/java/org/apache/vxquery/rest/service/VXQueryService.java
@@ -186,6 +186,10 @@ public class VXQueryService {
return APIResponse.newErrorResponse(request.getRequestId(), Error.builder().withCode(UNFORSEEN_PROBLEM)
.withMessage("Hyracks connection problem: " + e.getMessage()).build());
}
+ if (nodeControllerInfos.isEmpty()) {
+ return APIResponse.newErrorResponse(request.getRequestId(), Error.builder().withCode(UNFORSEEN_PROBLEM)
+ .withMessage("No NodeControllers available").build());
+ }
// Adding a query compilation listener
VXQueryCompilationListener listener = new VXQueryCompilationListener(response,
@@ -360,7 +364,7 @@ public class VXQueryService {
// This loop is required for XTests to reliably identify the error code of
// SystemException.
- while (reader.getResultStatus() == DatasetJobRecord.Status.RUNNING) {
+ while (reader.getResultStatus().getState() == DatasetJobRecord.State.RUNNING) {
Thread.sleep(100);
}
http://git-wip-us.apache.org/repos/asf/vxquery/blob/5d1175d2/vxquery-xtest/src/main/java/org/apache/vxquery/xtest/TestClusterUtil.java
----------------------------------------------------------------------
diff --git a/vxquery-xtest/src/main/java/org/apache/vxquery/xtest/TestClusterUtil.java b/vxquery-xtest/src/main/java/org/apache/vxquery/xtest/TestClusterUtil.java
index 4d2ae8a..b2d7f04 100644
--- a/vxquery-xtest/src/main/java/org/apache/vxquery/xtest/TestClusterUtil.java
+++ b/vxquery-xtest/src/main/java/org/apache/vxquery/xtest/TestClusterUtil.java
@@ -17,18 +17,11 @@
package org.apache.vxquery.xtest;
-import org.apache.hyracks.api.client.HyracksConnection;
-import org.apache.hyracks.client.dataset.HyracksDataset;
import org.apache.vxquery.app.util.LocalClusterUtil;
import org.apache.vxquery.rest.service.VXQueryConfig;
-import java.io.IOException;
-
public class TestClusterUtil {
- private static HyracksConnection hcc;
- private static HyracksDataset hds;
-
public static final LocalClusterUtil localClusterUtil = new LocalClusterUtil();
private TestClusterUtil() {
@@ -44,31 +37,12 @@ public class TestClusterUtil {
return vxqConfig;
}
- public static void startCluster(XTestOptions opts, LocalClusterUtil localClusterUtil) throws IOException {
- try {
- VXQueryConfig config = loadConfiguration(opts);
- localClusterUtil.init(config);
- hcc = (HyracksConnection) localClusterUtil.getConnection();
- hds = (HyracksDataset) localClusterUtil.getDataset();
- } catch (Exception e) {
- throw new IOException(e);
- }
- }
-
- public static void stopCluster(LocalClusterUtil localClusterUtil) throws IOException {
- try {
- localClusterUtil.deinit();
- } catch (Exception e) {
- throw new IOException(e);
- }
+ public static void startCluster(XTestOptions opts, LocalClusterUtil localClusterUtil) throws Exception {
+ VXQueryConfig config = loadConfiguration(opts);
+ localClusterUtil.init(config);
}
- public static HyracksConnection getConnection() {
- return hcc;
+ public static void stopCluster(LocalClusterUtil localClusterUtil) throws Exception {
+ localClusterUtil.deinit();
}
-
- public static HyracksDataset getDataset() {
- return hds;
- }
-
}
http://git-wip-us.apache.org/repos/asf/vxquery/blob/5d1175d2/vxquery-xtest/src/main/java/org/apache/vxquery/xtest/XTest.java
----------------------------------------------------------------------
diff --git a/vxquery-xtest/src/main/java/org/apache/vxquery/xtest/XTest.java b/vxquery-xtest/src/main/java/org/apache/vxquery/xtest/XTest.java
index df7a71d..df10271 100644
--- a/vxquery-xtest/src/main/java/org/apache/vxquery/xtest/XTest.java
+++ b/vxquery-xtest/src/main/java/org/apache/vxquery/xtest/XTest.java
@@ -100,7 +100,7 @@ public class XTest {
}
try {
TestClusterUtil.stopCluster(TestClusterUtil.localClusterUtil);
- } catch (IOException e) {
+ } catch (Exception e) {
e.printStackTrace();
}
try {
http://git-wip-us.apache.org/repos/asf/vxquery/blob/5d1175d2/vxquery-xtest/src/test/java/org/apache/vxquery/xtest/AbstractXQueryTest.java
----------------------------------------------------------------------
diff --git a/vxquery-xtest/src/test/java/org/apache/vxquery/xtest/AbstractXQueryTest.java b/vxquery-xtest/src/test/java/org/apache/vxquery/xtest/AbstractXQueryTest.java
index 8f77de4..afce2f1 100644
--- a/vxquery-xtest/src/test/java/org/apache/vxquery/xtest/AbstractXQueryTest.java
+++ b/vxquery-xtest/src/test/java/org/apache/vxquery/xtest/AbstractXQueryTest.java
@@ -87,7 +87,7 @@ public abstract class AbstractXQueryTest {
}
@BeforeClass
- public static void setup() throws IOException {
+ public static void setup() throws Exception {
TestClusterUtil.startCluster(getDefaultTestOptions(), TestClusterUtil.localClusterUtil);
setupFS();
}
@@ -109,7 +109,7 @@ public abstract class AbstractXQueryTest {
}
@AfterClass
- public static void shutdown() throws IOException {
+ public static void shutdown() throws Exception {
removeFS();
TestClusterUtil.stopCluster(TestClusterUtil.localClusterUtil);
}