You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@vxquery.apache.org by ch...@apache.org on 2017/07/11 17:24:36 UTC
[3/3] vxquery git commit: [VXQUERY-196][VXQUERY-204][VXQUERY-228]
Cleaning up indexing query statements
[VXQUERY-196][VXQUERY-204][VXQUERY-228] Cleaning up indexing query statements
1) Move XPath out of arguments for collection-from-index, leave only the collection path.
2) Hide the index usage from the user by creating a rewrite rule to figure out the existence of index.
3) Remove the unnecessary header tags that collection-from-index creates.
Project: http://git-wip-us.apache.org/repos/asf/vxquery/repo
Commit: http://git-wip-us.apache.org/repos/asf/vxquery/commit/4a38f670
Tree: http://git-wip-us.apache.org/repos/asf/vxquery/tree/4a38f670
Diff: http://git-wip-us.apache.org/repos/asf/vxquery/diff/4a38f670
Branch: refs/heads/master
Commit: 4a38f670cd11231bd18a321cbacdd4f4e109b50a
Parents: 37642be
Author: Christina Pavlopoulou <cp...@ucr.edu>
Authored: Tue May 30 11:54:31 2017 -0700
Committer: Christina Pavlopoulou <cp...@ucr.edu>
Committed: Tue Jul 11 10:20:22 2017 -0700
----------------------------------------------------------------------
.../java/org/apache/vxquery/cli/VXQuery.java | 141 ++++----
.../apache/vxquery/common/VXQueryCommons.java | 3 +-
.../rewriter/rules/IntroduceCollectionRule.java | 30 +-
.../rewriter/rules/IntroduceIndexingRule.java | 10 +-
.../rules/PushChildIntoDataScanRule.java | 7 +-
.../rules/PushValueIntoDatascanRule.java | 3 +
.../vxquery/functions/builtin-functions.xml | 3 +-
.../vxquery/index/IndexDocumentBuilder.java | 14 +-
.../VXQueryCollectionOperatorDescriptor.java | 58 ++--
.../metadata/VXQueryIndexingDataSource.java | 16 +-
.../VXQueryIndexingOperatorDescriptor.java | 181 ++++++-----
.../metadata/VXQueryMetadataProvider.java | 34 +-
.../functions/index/IndexConstructorUtil.java | 69 ++--
.../index/ShowIndexScalarEvaluatorFactory.java | 67 ----
.../ShowIndexesScalarEvaluatorFactory.java | 67 ++++
.../functions/index/VXQueryIndexReader.java | 115 +++++--
.../index/centralizer/IndexCentralizerUtil.java | 175 ++++++++++
.../index/centralizer/IndexDirectory.java | 43 +++
.../index/centralizer/IndexLocator.java | 52 +++
.../indexCentralizer/IndexCentralizerUtil.java | 165 ----------
.../index/indexCentralizer/IndexDirectory.java | 42 ---
.../index/indexCentralizer/IndexLocator.java | 50 ---
.../functions/index/update/Constants.java | 28 ++
.../functions/index/update/IndexUpdater.java | 304 ++++++++++++++++++
.../functions/index/update/MetaFileUtil.java | 205 ++++++++++++
.../functions/index/update/VXQueryIndex.java | 42 +++
.../functions/index/update/XmlMetadata.java | 74 +++++
.../index/update/XmlMetadataCollection.java | 66 ++++
.../functions/index/updateIndex/Constants.java | 25 --
.../index/updateIndex/IndexUpdater.java | 319 -------------------
.../index/updateIndex/MetaFileUtil.java | 198 ------------
.../index/updateIndex/VXQueryIndex.java | 42 ---
.../index/updateIndex/XmlMetadata.java | 72 -----
.../updateIndex/XmlMetadataCollection.java | 66 ----
.../json/JnDocScalarEvaluatorFactory.java | 2 +-
.../FnDocAvailableScalarEvaluatorFactory.java | 2 +-
.../node/FnDocScalarEvaluatorFactory.java | 2 +-
.../runtime/functions/util/FunctionHelper.java | 21 +-
.../vxquery/xmlparser/SAXContentHandler.java | 7 +-
.../xmlquery/query/XMLQueryCompiler.java | 4 +-
.../vxquery/indexing/MetaFileUtilTest.java | 4 +-
.../apache/vxquery/indexing/TestConstants.java | 4 +-
.../xmlquery/query/SimpleXQueryTest.java | 2 +-
.../org/apache/vxquery/xtest/TestRunner.java | 56 +++-
.../Indexing/Partition-1/useIndex1_user.txt | 2 +
.../Indexing/Partition-2/useIndex1_user.txt | 2 +
.../Indexing/Partition-4/useIndex1_user.txt | 2 +
.../XQuery/Indexing/Partition-1/useIndex1.xq | 4 +-
.../Indexing/Partition-1/useIndex1_user.xq | 25 ++
.../XQuery/Indexing/Partition-1/useIndex2.xq | 2 +-
.../XQuery/Indexing/Partition-1/useIndex3.xq | 2 +-
.../XQuery/Indexing/Partition-1/useIndex4.xq | 2 +-
.../XQuery/Indexing/Partition-1/useIndex5.xq | 2 +-
.../XQuery/Indexing/Partition-1/useIndex6.xq | 2 +-
.../XQuery/Indexing/Partition-1/useIndex7.xq | 4 +-
.../XQuery/Indexing/Partition-2/useIndex1.xq | 2 +-
.../Indexing/Partition-2/useIndex1_user.xq | 25 ++
.../XQuery/Indexing/Partition-2/useIndex2.xq | 2 +-
.../XQuery/Indexing/Partition-2/useIndex3.xq | 2 +-
.../XQuery/Indexing/Partition-2/useIndex4.xq | 2 +-
.../XQuery/Indexing/Partition-2/useIndex5.xq | 2 +-
.../XQuery/Indexing/Partition-2/useIndex6.xq | 2 +-
.../XQuery/Indexing/Partition-2/useIndex7.xq | 4 +-
.../XQuery/Indexing/Partition-4/useIndex1.xq | 2 +-
.../Indexing/Partition-4/useIndex1_user.xq | 25 ++
.../XQuery/Indexing/Partition-4/useIndex2.xq | 2 +-
.../XQuery/Indexing/Partition-4/useIndex3.xq | 2 +-
.../XQuery/Indexing/Partition-4/useIndex4.xq | 2 +-
.../XQuery/Indexing/Partition-4/useIndex5.xq | 2 +-
.../XQuery/Indexing/Partition-4/useIndex6.xq | 2 +-
.../XQuery/Indexing/Partition-4/useIndex7.xq | 4 +-
.../src/test/resources/VXQueryCatalog.xml | 4 +-
.../src/test/resources/cat/IndexingQueries.xml | 15 +
73 files changed, 1656 insertions(+), 1384 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/vxquery/blob/4a38f670/vxquery-cli/src/main/java/org/apache/vxquery/cli/VXQuery.java
----------------------------------------------------------------------
diff --git a/vxquery-cli/src/main/java/org/apache/vxquery/cli/VXQuery.java b/vxquery-cli/src/main/java/org/apache/vxquery/cli/VXQuery.java
index e0e3843..25ff9c4 100644
--- a/vxquery-cli/src/main/java/org/apache/vxquery/cli/VXQuery.java
+++ b/vxquery-cli/src/main/java/org/apache/vxquery/cli/VXQuery.java
@@ -14,6 +14,7 @@
*/
package org.apache.vxquery.cli;
+import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
@@ -23,6 +24,7 @@ import java.io.StringReader;
import java.net.InetAddress;
import java.nio.file.Files;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Date;
import java.util.EnumSet;
import java.util.HashMap;
@@ -66,12 +68,13 @@ import org.kohsuke.args4j.Option;
public class VXQuery {
private final CmdLineOptions opts;
+ private final CmdLineOptions indexOpts;
private ClusterControllerService cc;
private NodeControllerService[] ncs;
private IHyracksClientConnection hcc;
private IHyracksDataset hds;
-
+ private List<String> collectionList;
private ResultSetId resultSetId;
private static List<String> timingMessages = new ArrayList<>();
private static long sumTiming;
@@ -87,6 +90,16 @@ public class VXQuery {
*/
public VXQuery(CmdLineOptions opts) {
this.opts = opts;
+ // The index query returns only the result, without any other information.
+ this.indexOpts = opts;
+ indexOpts.showAST = false;
+ indexOpts.showOET = false;
+ indexOpts.showQuery = false;
+ indexOpts.showRP = false;
+ indexOpts.showTET = false;
+ indexOpts.timing = false;
+ indexOpts.compileOnly = false;
+ this.collectionList = new ArrayList<String>();
}
/**
@@ -168,71 +181,87 @@ public class VXQuery {
* @throws SystemException
* @throws Exception
*/
+
private void runQueries() throws Exception {
- Date start;
- Date end;
- for (String query : opts.arguments) {
- String qStr = slurp(query);
- if (opts.showQuery) {
- System.err.println(qStr);
+ List<String> queries = opts.arguments;
+ // Run the showIndexes query before executing any target query, to store the index metadata
+ List<String> queriesIndex = new ArrayList<String>();
+ queriesIndex.add("vxquery-xtest/src/test/resources/Queries/XQuery/Indexing/Partition-1/showIndexes.xq");
+ OutputStream resultStream = new ByteArrayOutputStream();
+ executeQuery(queriesIndex.get(0), 1, resultStream, indexOpts);
+ ByteArrayOutputStream bos = (ByteArrayOutputStream) resultStream;
+ String result = new String(bos.toByteArray());
+ String[] collections = result.split("\n");
+ this.collectionList = Arrays.asList(collections);
+ executeQueries(queries);
+ }
+
+ public void executeQueries(List<String> queries) throws Exception {
+ for (String query : queries) {
+ OutputStream resultStream = System.out;
+ if (opts.resultFile != null) {
+ resultStream = new FileOutputStream(new File(opts.resultFile));
}
+ executeQuery(query, opts.repeatExec, resultStream, opts);
+ }
+ }
- VXQueryCompilationListener listener = new VXQueryCompilationListener(opts.showAST, opts.showTET,
- opts.showOET, opts.showRP);
+ public void executeQuery(String query, int repeatedExecution, OutputStream resultStream, CmdLineOptions options)
+ throws Exception {
+ PrintWriter writer = new PrintWriter(resultStream, true);
+ String qStr = slurp(query);
+ if (opts.showQuery) {
+ writer.println(qStr);
+ }
+ VXQueryCompilationListener listener = new VXQueryCompilationListener(opts.showAST, opts.showTET, opts.showOET,
+ opts.showRP);
- start = opts.timing ? new Date() : null;
+ Date start = opts.timing ? new Date() : null;
- Map<String, NodeControllerInfo> nodeControllerInfos = null;
- if (hcc != null) {
- nodeControllerInfos = hcc.getNodeControllerInfos();
- }
- XMLQueryCompiler compiler = new XMLQueryCompiler(listener, nodeControllerInfos, opts.frameSize,
- opts.availableProcessors, opts.joinHashSize, opts.maximumDataSize, opts.hdfsConf);
- resultSetId = createResultSetId();
- CompilerControlBlock ccb = new CompilerControlBlock(new StaticContextImpl(RootStaticContextImpl.INSTANCE),
- resultSetId, null);
- compiler.compile(query, new StringReader(qStr), ccb, opts.optimizationLevel);
- // if -timing argument passed, show the starting and ending times
- if (opts.timing) {
- end = new Date();
- timingMessage("Compile time: " + (end.getTime() - start.getTime()) + " ms");
- }
- if (opts.compileOnly) {
- continue;
- }
-
- Module module = compiler.getModule();
- JobSpecification js = module.getHyracksJobSpecification();
+ Map<String, NodeControllerInfo> nodeControllerInfos = null;
+ if (hcc != null) {
+ nodeControllerInfos = hcc.getNodeControllerInfos();
+ }
+ XMLQueryCompiler compiler = new XMLQueryCompiler(listener, nodeControllerInfos, opts.frameSize,
+ opts.availableProcessors, opts.joinHashSize, opts.maximumDataSize, opts.hdfsConf);
+ resultSetId = createResultSetId();
+ CompilerControlBlock ccb = new CompilerControlBlock(new StaticContextImpl(RootStaticContextImpl.INSTANCE),
+ resultSetId, null);
+ compiler.compile(query, new StringReader(qStr), ccb, opts.optimizationLevel, this.collectionList);
+ // if -timing argument passed, show the starting and ending times
+ Date end = opts.timing ? new Date() : null;
+ if (opts.timing) {
+ timingMessage("Compile time: " + (end.getTime() - start.getTime()) + " ms");
+ }
+ if (opts.compileOnly) {
+ return;
+ }
- DynamicContext dCtx = new DynamicContextImpl(module.getModuleContext());
- js.setGlobalJobDataFactory(new VXQueryGlobalDataFactory(dCtx.createFactory()));
+ Module module = compiler.getModule();
+ JobSpecification js = module.getHyracksJobSpecification();
- OutputStream resultStream = System.out;
- if (opts.resultFile != null) {
- resultStream = new FileOutputStream(new File(opts.resultFile));
- }
+ DynamicContext dCtx = new DynamicContextImpl(module.getModuleContext());
+ js.setGlobalJobDataFactory(new VXQueryGlobalDataFactory(dCtx.createFactory()));
- PrintWriter writer = new PrintWriter(resultStream, true);
- // Repeat execution for number of times provided in -repeatexec argument
- for (int i = 0; i < opts.repeatExec; ++i) {
- start = opts.timing ? new Date() : null;
- runJob(js, writer);
- // if -timing argument passed, show the starting and ending times
- if (opts.timing) {
- end = new Date();
- long currentRun = end.getTime() - start.getTime();
- if ((i + 1) > opts.timingIgnoreQueries) {
- sumTiming += currentRun;
- sumSquaredTiming += currentRun * currentRun;
- if (currentRun < minTiming) {
- minTiming = currentRun;
- }
- if (maxTiming < currentRun) {
- maxTiming = currentRun;
- }
+ // Repeat execution for number of times provided in -repeatexec argument
+ for (int i = 0; i < repeatedExecution; ++i) {
+ start = opts.timing ? new Date() : null;
+ runJob(js, writer);
+ // if -timing argument passed, show the starting and ending times
+ if (opts.timing) {
+ end = new Date();
+ long currentRun = end.getTime() - start.getTime();
+ if ((i + 1) > opts.timingIgnoreQueries) {
+ sumTiming += currentRun;
+ sumSquaredTiming += currentRun * currentRun;
+ if (currentRun < minTiming) {
+ minTiming = currentRun;
+ }
+ if (maxTiming < currentRun) {
+ maxTiming = currentRun;
}
- timingMessage("Job (" + (i + 1) + ") execution time: " + currentRun + " ms");
}
+ timingMessage("Job (" + (i + 1) + ") execution time: " + currentRun + " ms");
}
}
}
http://git-wip-us.apache.org/repos/asf/vxquery/blob/4a38f670/vxquery-core/src/main/java/org/apache/vxquery/common/VXQueryCommons.java
----------------------------------------------------------------------
diff --git a/vxquery-core/src/main/java/org/apache/vxquery/common/VXQueryCommons.java b/vxquery-core/src/main/java/org/apache/vxquery/common/VXQueryCommons.java
index ceaf3c7..400fb15 100644
--- a/vxquery-core/src/main/java/org/apache/vxquery/common/VXQueryCommons.java
+++ b/vxquery-core/src/main/java/org/apache/vxquery/common/VXQueryCommons.java
@@ -35,7 +35,8 @@ public class VXQueryCommons {
static {
indexingFunctions.add(BuiltinFunctions.FN_BUILD_INDEX_ON_COLLECTION_1.getFunctionIdentifier());
- indexingFunctions.add(BuiltinFunctions.FN_COLLECTION_FROM_INDEX_2.getFunctionIdentifier());
+ indexingFunctions.add(BuiltinFunctions.FN_COLLECTION_1.getFunctionIdentifier());
+ indexingFunctions.add(BuiltinFunctions.FN_COLLECTION_FROM_INDEX_1.getFunctionIdentifier());
indexingFunctions.add(BuiltinFunctions.FN_DELETE_INDEX_1.getFunctionIdentifier());
indexingFunctions.add(BuiltinFunctions.FN_UPDATE_INDEX_1.getFunctionIdentifier());
}
http://git-wip-us.apache.org/repos/asf/vxquery/blob/4a38f670/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/IntroduceCollectionRule.java
----------------------------------------------------------------------
diff --git a/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/IntroduceCollectionRule.java b/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/IntroduceCollectionRule.java
index 20283d8..11d3795 100644
--- a/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/IntroduceCollectionRule.java
+++ b/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/IntroduceCollectionRule.java
@@ -16,12 +16,16 @@
*/
package org.apache.vxquery.compiler.rewriter.rules;
+import java.util.ArrayList;
+
import org.apache.commons.lang3.mutable.Mutable;
import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
import org.apache.vxquery.common.VXQueryCommons;
import org.apache.vxquery.compiler.rewriter.VXQueryOptimizationContext;
import org.apache.vxquery.metadata.VXQueryCollectionDataSource;
+import org.apache.vxquery.metadata.VXQueryIndexingDataSource;
+import org.apache.vxquery.metadata.VXQueryMetadataProvider;
import org.apache.vxquery.types.AnyItemType;
import org.apache.vxquery.types.Quantifier;
import org.apache.vxquery.types.SequenceType;
@@ -61,11 +65,35 @@ public class IntroduceCollectionRule extends AbstractCollectionRule {
public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) {
VXQueryOptimizationContext vxqueryContext = (VXQueryOptimizationContext) context;
String[] args = getFunctionalArguments(opRef, VXQueryCommons.collectionFunctions);
-
+ VXQueryMetadataProvider metadata = (VXQueryMetadataProvider) context.getMetadataProvider();
if (args != null) {
String collectionName = args[0];
// Build the new operator and update the query plan.
int collectionId = vxqueryContext.newCollectionId();
+ ArrayList<String> collectionTempName = new ArrayList<String>();
+ collectionTempName.add(collectionName);
+ if (collectionName.contains("|")) {
+ collectionTempName.remove(0);
+ int index = collectionName.indexOf("|");
+ int start = 0;
+ while (index >= 0) {
+ collectionTempName.add(collectionName.substring(start, index));
+ start = index + 1;
+ index = collectionName.indexOf("|", index + 1);
+ if (index == -1) {
+ collectionTempName.add(collectionName.substring(start));
+ }
+ }
+ }
+ if (metadata.hasIndex(collectionTempName)) {
+ VXQueryIndexingDataSource ids = VXQueryIndexingDataSource.create(collectionId, collectionName,
+ SequenceType.create(AnyItemType.INSTANCE, Quantifier.QUANT_STAR),
+ functionCall.getFunctionIdentifier().getName());
+ if (ids != null) {
+ ids.setTotalDataSources(vxqueryContext.getTotalDataSources());
+ return setDataSourceScan(ids, opRef);
+ }
+ }
VXQueryCollectionDataSource ds = VXQueryCollectionDataSource.create(collectionId, collectionName,
SequenceType.create(AnyItemType.INSTANCE, Quantifier.QUANT_STAR));
if (ds != null) {
http://git-wip-us.apache.org/repos/asf/vxquery/blob/4a38f670/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/IntroduceIndexingRule.java
----------------------------------------------------------------------
diff --git a/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/IntroduceIndexingRule.java b/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/IntroduceIndexingRule.java
index 5b96131..6e60d75 100644
--- a/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/IntroduceIndexingRule.java
+++ b/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/IntroduceIndexingRule.java
@@ -33,19 +33,19 @@ import org.apache.vxquery.types.SequenceType;
public class IntroduceIndexingRule extends AbstractCollectionRule {
@Override
- public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException {
+ public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
+ throws AlgebricksException {
VXQueryOptimizationContext vxqueryContext = (VXQueryOptimizationContext) context;
String args[] = getFunctionalArguments(opRef, VXQueryCommons.indexingFunctions);
if (args != null) {
String collection = args[0];
- String elementPath = args.length > 1?args[1]:null;
-
// Build the new operator and update the query plan.
int collectionId = vxqueryContext.newCollectionId();
- VXQueryIndexingDataSource ids = VXQueryIndexingDataSource.create(collectionId, collection, elementPath,
- SequenceType.create(AnyItemType.INSTANCE, Quantifier.QUANT_STAR), functionCall.getFunctionIdentifier().getName());
+ VXQueryIndexingDataSource ids = VXQueryIndexingDataSource.create(collectionId, collection,
+ SequenceType.create(AnyItemType.INSTANCE, Quantifier.QUANT_STAR),
+ functionCall.getFunctionIdentifier().getName());
if (ids != null) {
ids.setTotalDataSources(vxqueryContext.getTotalDataSources());
http://git-wip-us.apache.org/repos/asf/vxquery/blob/4a38f670/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/PushChildIntoDataScanRule.java
----------------------------------------------------------------------
diff --git a/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/PushChildIntoDataScanRule.java b/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/PushChildIntoDataScanRule.java
index dbcce54..2773154 100644
--- a/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/PushChildIntoDataScanRule.java
+++ b/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/PushChildIntoDataScanRule.java
@@ -55,12 +55,9 @@ public class PushChildIntoDataScanRule extends AbstractPushExpressionIntoDatasca
@Override
boolean updateDataSource(IVXQueryDataSource datasource, Mutable<ILogicalExpression> expression) {
- //TODO: indexing needs to be extended to support push child into datascan
- if (datasource.usingIndex()) {
- return false;
- }
- boolean added = false;
List<Mutable<ILogicalExpression>> finds = new ArrayList<Mutable<ILogicalExpression>>();
+ boolean added = false;
+
ExpressionToolbox.findAllFunctionExpressions(expression, BuiltinOperators.CHILD.getFunctionIdentifier(), finds);
for (int i = finds.size(); i > 0; --i) {
int typeId = ExpressionToolbox.getTypeExpressionTypeArgument(finds.get(i - 1));
http://git-wip-us.apache.org/repos/asf/vxquery/blob/4a38f670/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/PushValueIntoDatascanRule.java
----------------------------------------------------------------------
diff --git a/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/PushValueIntoDatascanRule.java b/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/PushValueIntoDatascanRule.java
index 1d8a55d..b901469 100644
--- a/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/PushValueIntoDatascanRule.java
+++ b/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/PushValueIntoDatascanRule.java
@@ -59,6 +59,9 @@ public class PushValueIntoDatascanRule extends AbstractPushExpressionIntoDatasca
@Override
boolean updateDataSource(IVXQueryDataSource datasource, Mutable<ILogicalExpression> expression) {
+ if (datasource.usingIndex()) {
+ return false;
+ }
VXQueryCollectionDataSource ds = (VXQueryCollectionDataSource) datasource;
boolean added = false;
List<Mutable<ILogicalExpression>> finds = new ArrayList<Mutable<ILogicalExpression>>();
http://git-wip-us.apache.org/repos/asf/vxquery/blob/4a38f670/vxquery-core/src/main/java/org/apache/vxquery/functions/builtin-functions.xml
----------------------------------------------------------------------
diff --git a/vxquery-core/src/main/java/org/apache/vxquery/functions/builtin-functions.xml b/vxquery-core/src/main/java/org/apache/vxquery/functions/builtin-functions.xml
index d64f423..4932a78 100644
--- a/vxquery-core/src/main/java/org/apache/vxquery/functions/builtin-functions.xml
+++ b/vxquery-core/src/main/java/org/apache/vxquery/functions/builtin-functions.xml
@@ -152,7 +152,6 @@
<!-- fn:collection-from-index($indexfolder as xs:string?, $elementpath as xs:string?) as node()* -->
<function name="fn:collection-from-index">
<param name="index-folder" type="xs:string?"/>
- <param name="element-path" type="xs:string?"/>
<return type="node()*"/>
<property type="DocumentOrder" class="org.apache.vxquery.compiler.rewriter.rules.propagationpolicies.InputPropertyPropagationPolicy">
<argument value="0"/>
@@ -166,7 +165,7 @@
<!-- fn:show-indexes as node()* -->
<function name="fn:show-indexes">
<return type="node()*"/>
- <runtime type="scalar" class="org.apache.vxquery.runtime.functions.index.ShowIndexScalarEvaluatorFactory"/>
+ <runtime type="scalar" class="org.apache.vxquery.runtime.functions.index.ShowIndexesScalarEvaluatorFactory"/>
</function>
<!-- fn:collection-with-tag($arg1 as xs:string?, $arg2 as xs:string?) as node()* -->
http://git-wip-us.apache.org/repos/asf/vxquery/blob/4a38f670/vxquery-core/src/main/java/org/apache/vxquery/index/IndexDocumentBuilder.java
----------------------------------------------------------------------
diff --git a/vxquery-core/src/main/java/org/apache/vxquery/index/IndexDocumentBuilder.java b/vxquery-core/src/main/java/org/apache/vxquery/index/IndexDocumentBuilder.java
index 7524da4..1df31dd 100644
--- a/vxquery-core/src/main/java/org/apache/vxquery/index/IndexDocumentBuilder.java
+++ b/vxquery-core/src/main/java/org/apache/vxquery/index/IndexDocumentBuilder.java
@@ -22,7 +22,6 @@ import java.util.ArrayList;
import java.util.List;
import org.apache.commons.lang3.StringUtils;
-import org.apache.hyracks.data.std.api.IPointable;
import org.apache.hyracks.data.std.primitive.BooleanPointable;
import org.apache.hyracks.data.std.primitive.BytePointable;
import org.apache.hyracks.data.std.primitive.DoublePointable;
@@ -56,12 +55,10 @@ import org.apache.vxquery.datamodel.accessors.nodes.NodeTreePointable;
import org.apache.vxquery.datamodel.accessors.nodes.TextOrCommentNodePointable;
import org.apache.vxquery.datamodel.values.ValueTag;
import org.apache.vxquery.runtime.functions.cast.CastToStringOperation;
-import org.apache.vxquery.runtime.functions.index.updateIndex.Constants;
+import org.apache.vxquery.runtime.functions.index.update.Constants;
import org.apache.vxquery.serializer.XMLSerializer;
public class IndexDocumentBuilder extends XMLSerializer {
- private final IPointable treePointable;
-
private final PointablePool pp;
private NodeTreePointable ntp;
@@ -88,16 +85,11 @@ public class IndexDocumentBuilder extends XMLSerializer {
}
//TODO: Handle Processing Instructions, PrefixedNames, and Namepsace entries
- public IndexDocumentBuilder(IPointable tree, IndexWriter inWriter, String file) {
- this.treePointable = tree;
+ public IndexDocumentBuilder(TaggedValuePointable tvp, IndexWriter inWriter, String file) {
writer = inWriter;
this.filePath = file;
- //convert to tagged value pointable
- TaggedValuePointable tvp = (TaggedValuePointable) TaggedValuePointable.FACTORY.createPointable();
- tvp.set(treePointable.getByteArray(), 0, treePointable.getLength());
-
//get bytes and info from doc pointer
bstart = tvp.getByteArray();
sstart = tvp.getStartOffset();
@@ -105,7 +97,7 @@ public class IndexDocumentBuilder extends XMLSerializer {
doc = new Document();
- results = new ArrayList<ComplexItem>();
+ results = new ArrayList<>();
pp = PointablePoolFactory.INSTANCE.createPointablePool();
}
http://git-wip-us.apache.org/repos/asf/vxquery/blob/4a38f670/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionOperatorDescriptor.java b/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionOperatorDescriptor.java
index 623b48c..a3756d5 100644
--- a/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionOperatorDescriptor.java
+++ b/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionOperatorDescriptor.java
@@ -59,7 +59,6 @@ import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
import org.apache.hyracks.dataflow.common.comm.io.FrameFixedFieldTupleAppender;
import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
import org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
import org.apache.hyracks.hdfs.ContextFactory;
@@ -129,38 +128,14 @@ public class VXQueryCollectionOperatorDescriptor extends AbstractSingleActivityO
public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
fta.reset(buffer);
String collectionModifiedName = collectionName.replace("${nodeId}", nodeId);
- Reader input;
+
if (!collectionModifiedName.contains("hdfs:/")) {
File collectionDirectory = new File(collectionModifiedName);
// check if directory is in the local file system
if (collectionDirectory.exists()) {
// Go through each tuple.
if (collectionDirectory.isDirectory()) {
- for (int tupleIndex = 0; tupleIndex < fta.getTupleCount(); ++tupleIndex) {
- Iterator<File> it = FileUtils.iterateFiles(collectionDirectory,
- new VXQueryIOFileFilter(), TrueFileFilter.INSTANCE);
- while (it.hasNext()) {
- File file = it.next();
- String fileName = file.getName().toLowerCase();
- if (fileName.endsWith(".xml")) {
- if (LOGGER.isLoggable(Level.FINE)) {
- LOGGER.fine("Starting to read XML document: " + file.getAbsolutePath());
- }
- parser.parseElements(file, writer, tupleIndex);
- } else if (fileName.endsWith(".json")) {
- if (LOGGER.isLoggable(Level.FINE)) {
- LOGGER.fine("Starting to read JSON document: " + file.getAbsolutePath());
- }
- try {
- jsonAbvs.reset();
- input = new InputStreamReader(new FileInputStream(file));
- jparser.parse(input, jsonAbvs, writer, appender);
- } catch (FileNotFoundException e) {
- throw new HyracksDataException(e.toString());
- }
- }
- }
- }
+ xmlAndJsonCollection(collectionDirectory);
} else {
throw new HyracksDataException("Invalid directory parameter (" + nodeId + ":"
+ collectionDirectory.getAbsolutePath() + ") passed to collection.");
@@ -272,6 +247,35 @@ public class VXQueryCollectionOperatorDescriptor extends AbstractSingleActivityO
}
}
+ public void xmlAndJsonCollection(File directory) throws HyracksDataException {
+ Reader input;
+ for (int tupleIndex = 0; tupleIndex < fta.getTupleCount(); ++tupleIndex) {
+ Iterator<File> it = FileUtils.iterateFiles(directory, new VXQueryIOFileFilter(),
+ TrueFileFilter.INSTANCE);
+ while (it.hasNext()) {
+ File file = it.next();
+ String fileName = file.getName().toLowerCase();
+ if (fileName.endsWith(".xml")) {
+ if (LOGGER.isLoggable(Level.FINE)) {
+ LOGGER.fine("Starting to read XML document: " + file.getAbsolutePath());
+ }
+ parser.parseElements(file, writer, tupleIndex);
+ } else if (fileName.endsWith(".json")) {
+ if (LOGGER.isLoggable(Level.FINE)) {
+ LOGGER.fine("Starting to read JSON document: " + file.getAbsolutePath());
+ }
+ try {
+ jsonAbvs.reset();
+ input = new InputStreamReader(new FileInputStream(file));
+ jparser.parse(input, jsonAbvs, writer, appender);
+ } catch (FileNotFoundException e) {
+ throw new HyracksDataException(e.toString());
+ }
+ }
+ }
+ }
+ }
+
@Override
public void fail() throws HyracksDataException {
writer.fail();
http://git-wip-us.apache.org/repos/asf/vxquery/blob/4a38f670/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryIndexingDataSource.java
----------------------------------------------------------------------
diff --git a/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryIndexingDataSource.java b/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryIndexingDataSource.java
index ea69cfd..d55530d 100644
--- a/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryIndexingDataSource.java
+++ b/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryIndexingDataSource.java
@@ -31,14 +31,11 @@ import org.apache.vxquery.compiler.rewriter.rules.CollectionFileDomain;
*/
public class VXQueryIndexingDataSource extends AbstractVXQueryDataSource {
- private String elementPath;
private String function;
- private VXQueryIndexingDataSource(int id, String collection, String elementPath, Object[] types,
- String functionCall) {
+ private VXQueryIndexingDataSource(int id, String collection, Object[] types, String functionCall) {
this.dataSourceId = id;
this.collectionName = collection;
- this.elementPath = elementPath;
this.function = functionCall;
this.collectionPartitions = collectionName.split(DELIMITER);
this.types = types;
@@ -56,13 +53,8 @@ public class VXQueryIndexingDataSource extends AbstractVXQueryDataSource {
this.valueSeq = new ArrayList<>();
}
- public static VXQueryIndexingDataSource create(int id, String collection, String index, Object type,
- String function) {
- return new VXQueryIndexingDataSource(id, collection, index, new Object[] { type }, function);
- }
-
- public String getElementPath() {
- return elementPath;
+ public static VXQueryIndexingDataSource create(int id, String collection, Object type, String function) {
+ return new VXQueryIndexingDataSource(id, collection, new Object[] { type }, function);
}
public String getFunctionCall() {
@@ -71,7 +63,7 @@ public class VXQueryIndexingDataSource extends AbstractVXQueryDataSource {
@Override
public String toString() {
- return "VXQueryIndexingDataSource [collectionName=" + collectionName + ", elementPath=" + elementPath
+ return "VXQueryIndexingDataSource [collectionName=" + collectionName + ", elementPath=" + this.childSeq
+ ", function=" + function + "]";
}
http://git-wip-us.apache.org/repos/asf/vxquery/blob/4a38f670/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryIndexingOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryIndexingOperatorDescriptor.java b/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryIndexingOperatorDescriptor.java
index ac92a0e..3563713 100644
--- a/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryIndexingOperatorDescriptor.java
+++ b/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryIndexingOperatorDescriptor.java
@@ -16,15 +16,13 @@
*/
package org.apache.vxquery.metadata;
-import java.io.DataInputStream;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.util.Map;
+import java.util.List;
import java.util.logging.Logger;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.api.client.NodeControllerInfo;
import org.apache.hyracks.api.comm.IFrame;
import org.apache.hyracks.api.comm.IFrameFieldAppender;
import org.apache.hyracks.api.comm.VSizeFrame;
@@ -35,23 +33,20 @@ import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
import org.apache.hyracks.data.std.api.IPointable;
-import org.apache.hyracks.data.std.primitive.UTF8StringPointable;
import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
import org.apache.hyracks.dataflow.common.comm.io.FrameFixedFieldTupleAppender;
import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import org.apache.hyracks.dataflow.common.comm.util.ByteBufferInputStream;
import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
import org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
import org.apache.vxquery.datamodel.accessors.TaggedValuePointable;
-import org.apache.vxquery.datamodel.builders.sequence.SequenceBuilder;
import org.apache.vxquery.datamodel.values.XDMConstants;
import org.apache.vxquery.exceptions.SystemException;
import org.apache.vxquery.functions.BuiltinFunctions;
import org.apache.vxquery.runtime.functions.index.IndexConstructorUtil;
import org.apache.vxquery.runtime.functions.index.VXQueryIndexReader;
-import org.apache.vxquery.runtime.functions.index.indexCentralizer.IndexCentralizerUtil;
-import org.apache.vxquery.runtime.functions.index.updateIndex.IndexUpdater;
+import org.apache.vxquery.runtime.functions.index.centralizer.IndexCentralizerUtil;
+import org.apache.vxquery.runtime.functions.index.update.IndexUpdater;
import org.apache.vxquery.xmlparser.ITreeNodeIdProvider;
import org.apache.vxquery.xmlparser.TreeNodeIdProvider;
@@ -61,18 +56,18 @@ public class VXQueryIndexingOperatorDescriptor extends AbstractSingleActivityOpe
private short dataSourceId;
private short totalDataSources;
private String[] collectionPartitions;
- private String elementPath;
private final String functionCall;
+ private List<Integer> childSeq;
public VXQueryIndexingOperatorDescriptor(IOperatorDescriptorRegistry spec, VXQueryIndexingDataSource ds,
- RecordDescriptor rDesc, String hdfsConf, Map<String, NodeControllerInfo> nodeControllerInfos) {
+ RecordDescriptor rDesc) {
super(spec, 1, 1);
this.functionCall = ds.getFunctionCall();
collectionPartitions = ds.getPartitions();
dataSourceId = (short) ds.getDataSourceId();
totalDataSources = (short) ds.getTotalDataSources();
recordDescriptors[0] = rDesc;
- this.elementPath = ds.getElementPath();
+ childSeq = ds.getChildSeq();
}
@Override
@@ -87,10 +82,11 @@ public class VXQueryIndexingOperatorDescriptor extends AbstractSingleActivityOpe
final ITreeNodeIdProvider nodeIdProvider = new TreeNodeIdProvider(partitionId, dataSourceId, totalDataSources);
final String nodeId = ctx.getJobletContext().getApplicationContext().getNodeId();
final String collectionName = collectionPartitions[partition % collectionPartitions.length];
- String collectionModifiedName = collectionName.replace("${nodeId}", nodeId);
+ final String collectionModifiedName = collectionName.replace("${nodeId}", nodeId);
IndexCentralizerUtil indexCentralizerUtil = new IndexCentralizerUtil(
ctx.getIOManager().getIODevices().get(0).getMount());
indexCentralizerUtil.readIndexDirectory();
+ final IPointable result = (TaggedValuePointable) TaggedValuePointable.FACTORY.createPointable();
return new AbstractUnaryInputUnaryOutputOperatorNodePushable() {
@Override
@@ -103,100 +99,115 @@ public class VXQueryIndexingOperatorDescriptor extends AbstractSingleActivityOpe
public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
fta.reset(buffer);
- IPointable result = new TaggedValuePointable();
-
- final UTF8StringPointable stringp = (UTF8StringPointable) UTF8StringPointable.FACTORY.createPointable();
- final TaggedValuePointable nodep = (TaggedValuePointable) TaggedValuePointable.FACTORY
- .createPointable();
-
- final ByteBufferInputStream bbis = new ByteBufferInputStream();
- final DataInputStream di = new DataInputStream(bbis);
- final SequenceBuilder sb = new SequenceBuilder();
final ArrayBackedValueStorage abvs = new ArrayBackedValueStorage();
final ArrayBackedValueStorage abvsFileNode = new ArrayBackedValueStorage();
- String indexModifiedName;
+ abvs.reset();
+ abvsFileNode.reset();
+
if (collectionModifiedName.contains("hdfs://")) {
throw new HyracksDataException("Indexing support for HDFS not yet implemented.");
} else {
if (functionCall.equals(
BuiltinFunctions.FN_BUILD_INDEX_ON_COLLECTION_1.getFunctionIdentifier().getName())) {
- indexModifiedName = indexCentralizerUtil.putIndexForCollection(collectionModifiedName);
- File collectionDirectory = new File(collectionModifiedName);
-
- //check if directory is in the local file system
- if (collectionDirectory.exists() && collectionDirectory.isDirectory()) {
- IndexConstructorUtil indexConstructorUtil = new IndexConstructorUtil();
- try {
- indexConstructorUtil.evaluate(collectionModifiedName, indexModifiedName, result,
- stringp, bbis, di, sb, abvs, nodeIdProvider, abvsFileNode, nodep, false,
- nodeId);
- XDMConstants.setTrue(result);
- FrameUtils.appendFieldToWriter(writer, appender, result.getByteArray(),
- result.getStartOffset(), result.getLength());
- } catch (SystemException e) {
- throw new HyracksDataException("Could not create index for collection: "
- + collectionName + " in dir: " + indexModifiedName + " " + e.getMessage());
- }
- } else {
- throw new HyracksDataException("Cannot find Collection Directory (" + nodeId + ":"
- + collectionDirectory.getAbsolutePath() + ")");
- }
- } else if (functionCall
- .equals(BuiltinFunctions.FN_UPDATE_INDEX_1.getFunctionIdentifier().getName())) {
- indexModifiedName = indexCentralizerUtil.getIndexForCollection(collectionModifiedName);
- IndexUpdater updater = new IndexUpdater(indexModifiedName, result, stringp, bbis, di, sb, abvs,
- nodeIdProvider, abvsFileNode, nodep, nodeId);
try {
- updater.setup();
- updater.updateIndex();
- updater.updateMetadataFile();
- updater.exit();
- XDMConstants.setTrue(result);
- FrameUtils.appendFieldToWriter(writer, appender, result.getByteArray(),
- result.getStartOffset(), result.getLength());
+ createIndex(result, abvs, abvsFileNode);
} catch (IOException e) {
- throw new HyracksDataException(
- "Could not update index in " + indexModifiedName + " " + e.getMessage());
+ throw new HyracksDataException(e);
}
} else if (functionCall
+ .equals(BuiltinFunctions.FN_UPDATE_INDEX_1.getFunctionIdentifier().getName())) {
+ updateIndex(result, abvs, abvsFileNode);
+ } else if (functionCall
.equals(BuiltinFunctions.FN_DELETE_INDEX_1.getFunctionIdentifier().getName())) {
- indexModifiedName = indexCentralizerUtil.getIndexForCollection(collectionModifiedName);
- IndexUpdater updater = new IndexUpdater(indexModifiedName, result, stringp, bbis, di, sb, abvs,
- nodeIdProvider, abvsFileNode, nodep, nodeId);
- indexCentralizerUtil.deleteEntryForCollection(collectionModifiedName);
- try {
- updater.setup();
- updater.deleteAllIndexes();
- XDMConstants.setTrue(result);
- FrameUtils.appendFieldToWriter(writer, appender, result.getByteArray(),
- result.getStartOffset(), result.getLength());
- } catch (IOException e) {
- throw new HyracksDataException(
- "Could not delete index in " + indexModifiedName + " " + e.getMessage());
- }
-
+ deleteIndex(result, abvs, abvsFileNode);
} else if (functionCall
- .equals(BuiltinFunctions.FN_COLLECTION_FROM_INDEX_2.getFunctionIdentifier().getName())) {
- indexModifiedName = indexCentralizerUtil.getIndexForCollection(collectionModifiedName);
- VXQueryIndexReader indexReader = new VXQueryIndexReader(ctx, indexModifiedName, elementPath);
- try {
- indexReader.init();
- while (indexReader.step(result)) {
- FrameUtils.appendFieldToWriter(writer, appender, result.getByteArray(),
- result.getStartOffset(), result.getLength());
- }
- } catch (AlgebricksException e) {
- throw new HyracksDataException("Could not read index.");
- }
-
+ .equals(BuiltinFunctions.FN_COLLECTION_FROM_INDEX_1.getFunctionIdentifier().getName())
+ || functionCall
+ .equals(BuiltinFunctions.FN_COLLECTION_1.getFunctionIdentifier().getName())) {
+ usingIndex(result);
} else {
throw new HyracksDataException("Unsupported function call (" + functionCall + ")");
}
}
}
+ public void createIndex(IPointable result, ArrayBackedValueStorage abvs,
+ ArrayBackedValueStorage abvsFileNode) throws IOException {
+ String indexModifiedName = indexCentralizerUtil.putIndexForCollection(collectionModifiedName);
+ File collectionDirectory = new File(collectionModifiedName);
+
+ //check if directory is in the local file system
+ if (collectionDirectory.exists() && collectionDirectory.isDirectory()) {
+ IndexConstructorUtil indexConstructorUtil = new IndexConstructorUtil();
+ try {
+ indexConstructorUtil.evaluate(collectionModifiedName, indexModifiedName, result, abvs,
+ nodeIdProvider, abvsFileNode, false, nodeId);
+ XDMConstants.setTrue(result);
+ FrameUtils.appendFieldToWriter(writer, appender, result.getByteArray(), result.getStartOffset(),
+ result.getLength());
+ } catch (SystemException e) {
+ throw new HyracksDataException("Could not create index for collection: " + collectionName
+ + " in dir: " + indexModifiedName + " " + e.getMessage(), e);
+ }
+ } else {
+ throw new HyracksDataException("Cannot find Collection Directory (" + nodeId + ":"
+ + collectionDirectory.getAbsolutePath() + ")");
+ }
+ }
+
+ public void updateIndex(IPointable result, ArrayBackedValueStorage abvs,
+ ArrayBackedValueStorage abvsFileNode) throws HyracksDataException {
+ String indexModifiedName = indexCentralizerUtil.getIndexForCollection(collectionModifiedName);
+ IndexUpdater updater = new IndexUpdater(indexModifiedName, result, abvs, nodeIdProvider, abvsFileNode,
+ nodeId);
+ try {
+ updater.setup();
+ updater.updateIndex();
+ updater.updateMetadataFile();
+ updater.exit();
+ XDMConstants.setTrue(result);
+ FrameUtils.appendFieldToWriter(writer, appender, result.getByteArray(), result.getStartOffset(),
+ result.getLength());
+ } catch (IOException e) {
+ throw new HyracksDataException(
+ "Could not update index in " + indexModifiedName + " " + e.getMessage(), e);
+ }
+ }
+
+ public void deleteIndex(IPointable result, ArrayBackedValueStorage abvs,
+ ArrayBackedValueStorage abvsFileNode) throws HyracksDataException {
+ String indexModifiedName = indexCentralizerUtil.getIndexForCollection(collectionModifiedName);
+ IndexUpdater updater = new IndexUpdater(indexModifiedName, result, abvs, nodeIdProvider, abvsFileNode,
+ nodeId);
+ indexCentralizerUtil.deleteEntryForCollection(collectionModifiedName);
+ try {
+ updater.setup();
+ updater.deleteAllIndexes();
+ XDMConstants.setTrue(result);
+ FrameUtils.appendFieldToWriter(writer, appender, result.getByteArray(), result.getStartOffset(),
+ result.getLength());
+ } catch (IOException e) {
+ throw new HyracksDataException(
+ "Could not delete index in " + indexModifiedName + " " + e.getMessage(), e);
+ }
+ }
+
+ public void usingIndex(IPointable result) throws HyracksDataException {
+ String indexModifiedName = indexCentralizerUtil.getIndexForCollection(collectionModifiedName);
+ VXQueryIndexReader indexReader = new VXQueryIndexReader(ctx, indexModifiedName, childSeq, appender);
+ try {
+ indexReader.init();
+ for (int tupleIndex = 0; tupleIndex < fta.getTupleCount(); ++tupleIndex) {
+ while (indexReader.step(result, writer, tupleIndex)) {
+ }
+ }
+ } catch (AlgebricksException e) {
+ throw new HyracksDataException("Could not read index.", e);
+ }
+ }
+
@Override
public void fail() throws HyracksDataException {
writer.fail();
http://git-wip-us.apache.org/repos/asf/vxquery/blob/4a38f670/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryMetadataProvider.java
----------------------------------------------------------------------
diff --git a/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryMetadataProvider.java b/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryMetadataProvider.java
index f6644d6..5bb9d1a 100644
--- a/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryMetadataProvider.java
+++ b/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryMetadataProvider.java
@@ -59,15 +59,17 @@ public class VXQueryMetadataProvider implements IMetadataProvider<String, String
private final Map<String, File> sourceFileMap;
private final StaticContext staticCtx;
private final String hdfsConf;
+ private final List<String> collections;
private final Map<String, NodeControllerInfo> nodeControllerInfos;
public VXQueryMetadataProvider(String[] nodeList, Map<String, File> sourceFileMap, StaticContext staticCtx,
- String hdfsConf, Map<String, NodeControllerInfo> nodeControllerInfos) {
+ String hdfsConf, Map<String, NodeControllerInfo> nodeControllerInfos, List<String> collections) {
this.nodeList = nodeList;
this.sourceFileMap = sourceFileMap;
this.staticCtx = staticCtx;
this.hdfsConf = hdfsConf;
this.nodeControllerInfos = nodeControllerInfos;
+ this.collections = collections;
}
@Override
@@ -111,8 +113,7 @@ public class VXQueryMetadataProvider implements IMetadataProvider<String, String
constraint = getClusterLocations(nodeList, ds.getPartitionCount());
} else {
rDesc = new RecordDescriptor(new ISerializerDeserializer[opSchema.getSize()]);
- scanner = new VXQueryIndexingOperatorDescriptor(jobSpec, (VXQueryIndexingDataSource) ds, rDesc,
- this.hdfsConf, this.nodeControllerInfos);
+ scanner = new VXQueryIndexingOperatorDescriptor(jobSpec, (VXQueryIndexingDataSource) ds, rDesc);
constraint = getClusterLocations(nodeList, ds.getPartitionCount());
}
@@ -142,7 +143,7 @@ public class VXQueryMetadataProvider implements IMetadataProvider<String, String
@Override
public Pair<IPushRuntimeFactory, AlgebricksPartitionConstraint> getWriteFileRuntime(IDataSink sink,
int[] printColumns, IPrinterFactory[] printerFactories, RecordDescriptor inputDesc)
- throws AlgebricksException {
+ throws AlgebricksException {
throw new UnsupportedOperationException();
}
@@ -168,7 +169,7 @@ public class VXQueryMetadataProvider implements IMetadataProvider<String, String
IOperatorSchema[] inputSchemas, IVariableTypeEnvironment typeEnv, List<LogicalVariable> primaryKeys,
List<LogicalVariable> secondaryKeys, List<LogicalVariable> additionalNonKeyFields,
ILogicalExpression filterExpr, RecordDescriptor recordDesc, JobGenContext context, JobSpecification spec)
- throws AlgebricksException {
+ throws AlgebricksException {
throw new UnsupportedOperationException();
}
@@ -234,7 +235,7 @@ public class VXQueryMetadataProvider implements IMetadataProvider<String, String
JobGenContext context, JobSpecification spec, boolean bulkload) throws AlgebricksException {
throw new UnsupportedOperationException();
}
-
+
@Override
public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getInsertRuntime(IDataSource<String> dataSource,
IOperatorSchema propagatedSchema, IVariableTypeEnvironment typeEnv, List<LogicalVariable> keys,
@@ -243,7 +244,7 @@ public class VXQueryMetadataProvider implements IMetadataProvider<String, String
JobSpecification jobSpec, boolean bulkload) throws AlgebricksException {
throw new UnsupportedOperationException();
}
-
+
@Override
public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getUpsertRuntime(IDataSource<String> dataSource,
IOperatorSchema inputSchema, IVariableTypeEnvironment typeEnv, List<LogicalVariable> keys,
@@ -252,7 +253,7 @@ public class VXQueryMetadataProvider implements IMetadataProvider<String, String
JobSpecification jobSpec) throws AlgebricksException {
throw new UnsupportedOperationException();
}
-
+
@Override
public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getIndexUpsertRuntime(
IDataSourceIndex<String, String> dataSourceIndex, IOperatorSchema propagatedSchema,
@@ -263,11 +264,26 @@ public class VXQueryMetadataProvider implements IMetadataProvider<String, String
JobSpecification spec) throws AlgebricksException {
throw new UnsupportedOperationException();
}
-
+
@Override
public Map<String, String> getConfig() {
return new HashMap<>();
}
+ public List<String> getIndexCollections() {
+ return collections;
+
+ }
+
+ public boolean hasIndex(ArrayList<String> collections) {
+ boolean indexExists = false;
+ for (String collection : collections) {
+ indexExists = getIndexCollections().contains(collection);
+ if (!indexExists) {
+ break;
+ }
+ }
+ return indexExists;
+ }
}
http://git-wip-us.apache.org/repos/asf/vxquery/blob/4a38f670/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/index/IndexConstructorUtil.java
----------------------------------------------------------------------
diff --git a/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/index/IndexConstructorUtil.java b/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/index/IndexConstructorUtil.java
index 4706496..2c25752 100644
--- a/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/index/IndexConstructorUtil.java
+++ b/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/index/IndexConstructorUtil.java
@@ -16,10 +16,14 @@
*/
package org.apache.vxquery.runtime.functions.index;
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Paths;
+import java.text.SimpleDateFormat;
+import java.util.concurrent.ConcurrentHashMap;
+
import org.apache.hyracks.data.std.api.IPointable;
-import org.apache.hyracks.data.std.primitive.UTF8StringPointable;
import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
-import org.apache.hyracks.dataflow.common.comm.util.ByteBufferInputStream;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
@@ -31,38 +35,31 @@ import org.apache.vxquery.datamodel.builders.sequence.SequenceBuilder;
import org.apache.vxquery.exceptions.ErrorCode;
import org.apache.vxquery.exceptions.SystemException;
import org.apache.vxquery.index.IndexDocumentBuilder;
-import org.apache.vxquery.runtime.functions.index.updateIndex.MetaFileUtil;
-import org.apache.vxquery.runtime.functions.index.updateIndex.XmlMetadata;
+import org.apache.vxquery.runtime.functions.index.update.MetaFileUtil;
+import org.apache.vxquery.runtime.functions.index.update.XmlMetadata;
import org.apache.vxquery.runtime.functions.util.FunctionHelper;
import org.apache.vxquery.xmlparser.IParser;
import org.apache.vxquery.xmlparser.ITreeNodeIdProvider;
import org.apache.vxquery.xmlparser.XMLParser;
-import java.io.DataInputStream;
-import java.io.File;
-import java.io.IOException;
-import java.nio.file.Paths;
-import java.text.SimpleDateFormat;
-import java.util.concurrent.ConcurrentHashMap;
-
public class IndexConstructorUtil {
- boolean isMetaFilePresent = false;
- MetaFileUtil metaFileUtil;
- ConcurrentHashMap<String, XmlMetadata> metadataMap = new ConcurrentHashMap<>();
+ private final TaggedValuePointable nodep = (TaggedValuePointable) TaggedValuePointable.FACTORY.createPointable();
+ private final SequenceBuilder sb = new SequenceBuilder();
+ private boolean isMetaFilePresent = false;
+ private MetaFileUtil metaFileUtil;
+ private ConcurrentHashMap<String, XmlMetadata> metadataMap = new ConcurrentHashMap<>();
- public void evaluate(String collectioFolder, String indexFolder, IPointable result, UTF8StringPointable
- stringp, ByteBufferInputStream bbis, DataInputStream di, SequenceBuilder sb, ArrayBackedValueStorage abvs,
- ITreeNodeIdProvider nodeIdProvider, ArrayBackedValueStorage abvsFileNode, TaggedValuePointable nodep,
- boolean isElementPath, String nodeId) throws SystemException {
+ public void evaluate(String collectioFolder, String indexFolder, IPointable result, ArrayBackedValueStorage abvs,
+ ITreeNodeIdProvider nodeIdProvider, ArrayBackedValueStorage abvsFileNode, boolean isElementPath,
+ String nodeId) throws IOException {
- metaFileUtil = new MetaFileUtil(indexFolder);
-// metaFileUtil = .create(indexFolder);
- isMetaFilePresent = metaFileUtil.isMetaFilePresent();
- metaFileUtil.setCollection(collectioFolder);
+ metaFileUtil = new MetaFileUtil(indexFolder);
+ isMetaFilePresent = metaFileUtil.isMetaFilePresent();
+ metaFileUtil.setCollection(collectioFolder);
File collectionDirectory = new File(collectioFolder);
if (!collectionDirectory.exists()) {
- throw new RuntimeException("The collection directory (" + collectioFolder + ") does not exist.");
+ throw new IOException("The collection directory (" + collectioFolder + ") does not exist.");
}
try {
@@ -80,8 +77,7 @@ public class IndexConstructorUtil {
IndexWriter writer = new IndexWriter(dir, iwc);
//Add files to index
- indexXmlFiles(collectionDirectory, writer, isElementPath, nodep, abvsFileNode, nodeIdProvider, sb, bbis, di,
- nodeId);
+ indexXmlFiles(collectionDirectory, writer, isElementPath, abvsFileNode, nodeIdProvider, sb, nodeId);
if (!isMetaFilePresent) {
// Write metadata map to a file.
@@ -101,14 +97,13 @@ public class IndexConstructorUtil {
}
}
- /*This function goes recursively one file at a time. First it turns the file into an ABVS document node, then
+ /*
+ * This function goes recursively one file at a time. First it turns the file into an ABVS document node, then
* it indexes that document node.
*/
public void indexXmlFiles(File collectionDirectory, IndexWriter writer, boolean isElementPath,
- TaggedValuePointable nodep, ArrayBackedValueStorage abvsFileNode, ITreeNodeIdProvider nodeIdProvider,
- SequenceBuilder sb, ByteBufferInputStream bbis, DataInputStream di, String nodeId)
- throws SystemException, IOException {
-
+ ArrayBackedValueStorage abvsFileNode, ITreeNodeIdProvider nodeIdProvider, SequenceBuilder sb, String nodeId)
+ throws IOException {
SimpleDateFormat sdf = new SimpleDateFormat("dd/MM/yyyy, HH:mm:ss");
for (File file : collectionDirectory.listFiles()) {
@@ -116,8 +111,7 @@ public class IndexConstructorUtil {
if (readableXmlFile(file.getPath())) {
abvsFileNode.reset();
- IndexDocumentBuilder ibuilder = getIndexBuilder(file, writer, nodep, abvsFileNode, nodeIdProvider, bbis,
- di, nodeId);
+ IndexDocumentBuilder ibuilder = getIndexBuilder(file, writer, abvsFileNode, nodeIdProvider, nodeId);
ibuilder.printStart();
if (!isMetaFilePresent) {
@@ -131,22 +125,21 @@ public class IndexConstructorUtil {
} else if (file.isDirectory()) {
// Consider all XML file in sub directories.
- indexXmlFiles(file, writer, isElementPath, nodep, abvsFileNode, nodeIdProvider, sb, bbis, di, nodeId);
+ indexXmlFiles(file, writer, isElementPath, abvsFileNode, nodeIdProvider, sb, nodeId);
}
}
}
public boolean readableXmlFile(String path) {
- return (path.toLowerCase().endsWith(".xml") || path.toLowerCase().endsWith(".xml.gz"));
+ return path.toLowerCase().endsWith(".xml") || path.toLowerCase().endsWith(".xml.gz");
}
- public IndexDocumentBuilder getIndexBuilder(File file, IndexWriter writer, TaggedValuePointable nodep,
- ArrayBackedValueStorage abvsFileNode, ITreeNodeIdProvider nodeIdProvider, ByteBufferInputStream bbis,
- DataInputStream di, String nodeId) throws IOException {
+ public IndexDocumentBuilder getIndexBuilder(File file, IndexWriter writer, ArrayBackedValueStorage abvsFileNode,
+ ITreeNodeIdProvider nodeIdProvider, String nodeId) throws IOException {
//Get the document node
IParser parser = new XMLParser(false, nodeIdProvider, nodeId);
- FunctionHelper.readInDocFromString(file.getPath(), bbis, di, abvsFileNode, parser);
+ FunctionHelper.readInDocFromString(file.getPath(), abvsFileNode, parser);
nodep.set(abvsFileNode.getByteArray(), abvsFileNode.getStartOffset(), abvsFileNode.getLength());
http://git-wip-us.apache.org/repos/asf/vxquery/blob/4a38f670/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/index/ShowIndexScalarEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/index/ShowIndexScalarEvaluatorFactory.java b/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/index/ShowIndexScalarEvaluatorFactory.java
deleted file mode 100644
index 6677bd9..0000000
--- a/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/index/ShowIndexScalarEvaluatorFactory.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements. See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-package org.apache.vxquery.runtime.functions.index;
-
-import java.io.IOException;
-
-import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
-import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.data.std.api.IPointable;
-import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
-import org.apache.vxquery.datamodel.accessors.TaggedValuePointable;
-import org.apache.vxquery.datamodel.builders.sequence.SequenceBuilder;
-import org.apache.vxquery.exceptions.ErrorCode;
-import org.apache.vxquery.exceptions.SystemException;
-import org.apache.vxquery.runtime.functions.base.AbstractTaggedValueArgumentScalarEvaluator;
-import org.apache.vxquery.runtime.functions.base.AbstractTaggedValueArgumentScalarEvaluatorFactory;
-import org.apache.vxquery.runtime.functions.index.indexCentralizer.IndexCentralizerUtil;
-
-public class ShowIndexScalarEvaluatorFactory extends AbstractTaggedValueArgumentScalarEvaluatorFactory {
- private static final long serialVersionUID = 1L;
-
- public ShowIndexScalarEvaluatorFactory(IScalarEvaluatorFactory[] args) {
- super(args);
- }
-
- @Override
- protected IScalarEvaluator createEvaluator(IHyracksTaskContext ctx, IScalarEvaluator[] args)
- throws HyracksDataException {
- final ArrayBackedValueStorage abvs = new ArrayBackedValueStorage();
- final SequenceBuilder sb = new SequenceBuilder();
-
- return new AbstractTaggedValueArgumentScalarEvaluator(args) {
- @Override
- protected void evaluate(TaggedValuePointable[] args, IPointable result) throws SystemException {
- try {
- abvs.reset();
- sb.reset(abvs);
- IndexCentralizerUtil indexCentralizerUtil = new IndexCentralizerUtil(
- ctx.getIOManager().getIODevices().get(0).getMount());
- indexCentralizerUtil.readIndexDirectory();
- indexCentralizerUtil.getAllCollections(sb);
- sb.finish();
- result.set(abvs);
- } catch (IOException e) {
- throw new SystemException(ErrorCode.SYSE0001, e);
- }
-
- }
- };
- }
-}
http://git-wip-us.apache.org/repos/asf/vxquery/blob/4a38f670/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/index/ShowIndexesScalarEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/index/ShowIndexesScalarEvaluatorFactory.java b/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/index/ShowIndexesScalarEvaluatorFactory.java
new file mode 100644
index 0000000..6b18b33
--- /dev/null
+++ b/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/index/ShowIndexesScalarEvaluatorFactory.java
@@ -0,0 +1,67 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.vxquery.runtime.functions.index;
+
+import java.io.IOException;
+
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IPointable;
+import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
+import org.apache.vxquery.datamodel.accessors.TaggedValuePointable;
+import org.apache.vxquery.datamodel.builders.sequence.SequenceBuilder;
+import org.apache.vxquery.exceptions.ErrorCode;
+import org.apache.vxquery.exceptions.SystemException;
+import org.apache.vxquery.runtime.functions.base.AbstractTaggedValueArgumentScalarEvaluator;
+import org.apache.vxquery.runtime.functions.base.AbstractTaggedValueArgumentScalarEvaluatorFactory;
+import org.apache.vxquery.runtime.functions.index.centralizer.IndexCentralizerUtil;
+
+public class ShowIndexesScalarEvaluatorFactory extends AbstractTaggedValueArgumentScalarEvaluatorFactory {
+ private static final long serialVersionUID = 1L;
+
+ public ShowIndexesScalarEvaluatorFactory(IScalarEvaluatorFactory[] args) {
+ super(args);
+ }
+
+ @Override
+ protected IScalarEvaluator createEvaluator(IHyracksTaskContext ctx, IScalarEvaluator[] args)
+ throws HyracksDataException {
+ final ArrayBackedValueStorage abvs = new ArrayBackedValueStorage();
+ final SequenceBuilder sb = new SequenceBuilder();
+
+ return new AbstractTaggedValueArgumentScalarEvaluator(args) {
+ @Override
+ protected void evaluate(TaggedValuePointable[] args, IPointable result) throws SystemException {
+ try {
+ abvs.reset();
+ sb.reset(abvs);
+ IndexCentralizerUtil indexCentralizerUtil = new IndexCentralizerUtil(
+ ctx.getIOManager().getIODevices().get(0).getMount());
+ indexCentralizerUtil.readIndexDirectory();
+ indexCentralizerUtil.getAllCollections(sb);
+ sb.finish();
+ result.set(abvs);
+ } catch (IOException e) {
+ throw new SystemException(ErrorCode.SYSE0001, e);
+ }
+
+ }
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/vxquery/blob/4a38f670/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/index/VXQueryIndexReader.java
----------------------------------------------------------------------
diff --git a/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/index/VXQueryIndexReader.java b/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/index/VXQueryIndexReader.java
index 8750849..cf781ab 100644
--- a/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/index/VXQueryIndexReader.java
+++ b/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/index/VXQueryIndexReader.java
@@ -16,11 +16,17 @@
*/
package org.apache.vxquery.runtime.functions.index;
+import java.io.IOException;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.List;
+
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.api.comm.IFrameFieldAppender;
+import org.apache.hyracks.api.comm.IFrameWriter;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.data.std.api.IPointable;
import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
-import org.apache.hyracks.dataflow.common.comm.util.ByteBufferInputStream;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.document.Document;
import org.apache.lucene.index.DirectoryReader;
@@ -32,21 +38,21 @@ import org.apache.lucene.search.Query;
import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.search.TopDocs;
import org.apache.lucene.store.FSDirectory;
+import org.apache.vxquery.context.DynamicContext;
import org.apache.vxquery.exceptions.ErrorCode;
import org.apache.vxquery.exceptions.SystemException;
import org.apache.vxquery.index.IndexAttributes;
+import org.apache.vxquery.runtime.functions.util.FunctionHelper;
+import org.apache.vxquery.types.ElementType;
+import org.apache.vxquery.types.NameTest;
+import org.apache.vxquery.types.NodeType;
+import org.apache.vxquery.types.SequenceType;
import org.apache.vxquery.xmlparser.ITreeNodeIdProvider;
import org.apache.vxquery.xmlparser.SAXContentHandler;
import org.apache.vxquery.xmlparser.TreeNodeIdProvider;
import org.xml.sax.Attributes;
import org.xml.sax.SAXException;
-import java.io.DataInputStream;
-import java.io.IOException;
-import java.nio.file.Paths;
-import java.util.ArrayList;
-import java.util.List;
-
public class VXQueryIndexReader {
private ArrayBackedValueStorage nodeAbvs = new ArrayBackedValueStorage();
@@ -55,10 +61,7 @@ public class VXQueryIndexReader {
private int indexLength;
private String elementPath;
private String indexName;
-
- private ByteBufferInputStream bbis = new ByteBufferInputStream();
- private DataInputStream di = new DataInputStream(bbis);
-
+ private List<SequenceType> childSequenceTypes;
private IndexReader reader;
private IndexSearcher searcher;
private QueryParser parser;
@@ -68,14 +71,40 @@ public class VXQueryIndexReader {
private Document doc;
private List<IndexableField> fields;
private IHyracksTaskContext ctx;
+ private String[] childLocalName = null;
+ private IFrameFieldAppender appender;
+ private boolean firstElement;
- public VXQueryIndexReader(IHyracksTaskContext context, String indexPath, String elementPath) {
+ public VXQueryIndexReader(IHyracksTaskContext context, String indexPath, List<Integer> childSeq,
+ IFrameFieldAppender appender) {
this.ctx = context;
this.indexName = indexPath;
- this.elementPath = elementPath;
+ this.appender = appender;
+ final DynamicContext dCtx = (DynamicContext) ctx.getJobletContext().getGlobalJobData();
+ childSequenceTypes = new ArrayList<>();
+ for (int typeCode : childSeq) {
+ childSequenceTypes.add(dCtx.getStaticContext().lookupSequenceType(typeCode));
+ }
+ childLocalName = new String[childSequenceTypes.size()];
+ int index = 0;
+ StringBuilder stb = new StringBuilder();
+ stb.append("/");
+ for (SequenceType sType : childSequenceTypes) {
+ NodeType nodeType = (NodeType) sType.getItemType();
+ ElementType eType = (ElementType) nodeType;
+ NameTest nameTest = eType.getNameTest();
+ childLocalName[index] = FunctionHelper.getStringFromBytes(nameTest.getLocalName());
+
+ stb.append(childLocalName[index]);
+ if (index != childSequenceTypes.size() - 1) {
+ stb.append("/");
+ }
+ ++index;
+ }
+ elementPath = stb.toString();
}
- public boolean step(IPointable result) throws AlgebricksException {
+ public boolean step(IPointable result, IFrameWriter writer, int tupleIndex) throws AlgebricksException {
/*each step will create a tuple for a single xml file
* This is done using the parse function
* checkoverflow is used throughout. This is because memory might not be
@@ -88,6 +117,8 @@ public class VXQueryIndexReader {
//TODO: now we get back the entire document
doc = searcher.doc(hits[indexPlace].doc);
fields = doc.getFields();
+ handler.setupElementWriter(writer, tupleIndex);
+ this.firstElement = true;
parse(nodeAbvs);
} catch (IOException e) {
throw new AlgebricksException(e);
@@ -103,7 +134,7 @@ public class VXQueryIndexReader {
int partition = ctx.getTaskAttemptId().getTaskId().getPartition();
ITreeNodeIdProvider nodeIdProvider = new TreeNodeIdProvider((short) partition);
- handler = new SAXContentHandler(false, nodeIdProvider, true);
+ handler = new SAXContentHandler(false, nodeIdProvider, appender, childSequenceTypes);
nodeAbvs.reset();
indexPlace = 0;
@@ -125,7 +156,7 @@ public class VXQueryIndexReader {
String queryString = elementPath.replaceAll("/", ".");
queryString = "item:" + queryString + "*";
- int lastslash = elementPath.lastIndexOf("/");
+ int lastslash = elementPath.lastIndexOf('/');
elementPath = elementPath.substring(0, lastslash) + ":" + elementPath.substring(lastslash + 1);
elementPath = elementPath.replaceAll("/", ".") + ".element";
@@ -135,31 +166,25 @@ public class VXQueryIndexReader {
//TODO: Right now it only returns 1000000 results
results = searcher.search(query, 1000000);
-
} catch (Exception e) {
- throw new SystemException(null);
+ throw new SystemException(null, e);
}
hits = results.scoreDocs;
- System.out.println("found: " + results.totalHits);
indexPlace = 0;
indexLength = hits.length;
-
}
public void parse(ArrayBackedValueStorage abvsFileNode) throws IOException {
try {
- handler.startDocument();
-
for (int i = 0; i < fields.size(); i++) {
String fieldValue = fields.get(i).stringValue();
if (fieldValue.equals(elementPath)) {
+ handler.startDocument();
+ this.firstElement = true;
buildElement(abvsFileNode, i);
}
}
-
- handler.endDocument();
- handler.writeDocument(abvsFileNode);
} catch (Exception e) {
throw new IOException(e);
}
@@ -167,6 +192,7 @@ public class VXQueryIndexReader {
private int buildElement(ArrayBackedValueStorage abvsFileNode, int fieldNum) throws SAXException {
int whereIFinish = fieldNum;
+ int firstFinish;
IndexableField field = fields.get(fieldNum);
String contents = field.stringValue();
String uri = "";
@@ -176,18 +202,37 @@ public class VXQueryIndexReader {
String type = contents.substring(lastDot + 1);
String lastBit = contents.substring(firstColon + 1, lastDot);
- if (type.equals("textnode")) {
+ if (this.firstElement) {
+ this.firstElement = false;
+ firstFinish = whereIFinish - this.childSequenceTypes.size() + 1;
+ String firstBit = contents.substring(1, firstColon);
+ List<String> names = new ArrayList<>();
+ List<String> values = new ArrayList<>();
+ List<String> uris = new ArrayList<>();
+ List<String> localNames = new ArrayList<>();
+ List<String> types = new ArrayList<>();
+ List<String> qNames = new ArrayList<>();
+ firstFinish = findAttributeChildren(firstFinish, names, values, uris, localNames, types, qNames);
+ Attributes atts = new IndexAttributes(names, values, uris, localNames, types, qNames);
+
+ handler.startElement(uri, firstBit, firstBit, atts);
+ buildElement(abvsFileNode, firstFinish + 1);
+ handler.endElement(uri, firstBit, firstBit);
+
+ }
+
+ if ("textnode".equals(type)) {
char[] charContents = lastBit.toCharArray();
handler.characters(charContents, 0, charContents.length);
}
- if (type.equals("element")) {
- List<String> names = new ArrayList<String>();
- List<String> values = new ArrayList<String>();
- List<String> uris = new ArrayList<String>();
- List<String> localNames = new ArrayList<String>();
- List<String> types = new ArrayList<String>();
- List<String> qNames = new ArrayList<String>();
+ if ("element".equals(type)) {
+ List<String> names = new ArrayList<>();
+ List<String> values = new ArrayList<>();
+ List<String> uris = new ArrayList<>();
+ List<String> localNames = new ArrayList<>();
+ List<String> types = new ArrayList<>();
+ List<String> qNames = new ArrayList<>();
whereIFinish = findAttributeChildren(whereIFinish, names, values, uris, localNames, types, qNames);
Attributes atts = new IndexAttributes(names, values, uris, localNames, types, qNames);
@@ -264,7 +309,7 @@ public class VXQueryIndexReader {
String adultPath = adultId.substring(0, lastDotAdult);
adultPath = adultPath.replaceFirst(":", ".");
- return (childPath.startsWith(adultPath + ":") || childPath.startsWith(adultPath + "."));
+ return childPath.startsWith(adultPath + ":") || childPath.startsWith(adultPath + ".");
}
boolean isDirectChildAttribute(IndexableField child, IndexableField adult) {
@@ -278,7 +323,7 @@ public class VXQueryIndexReader {
String childType = childSegments[childSegments.length - 1];
- return (childPath.startsWith(adultPath + ":") && childType.equals("attribute"));
+ return childPath.startsWith(adultPath + ":") && "attribute".equals(childType);
}
}