You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rya.apache.org by mi...@apache.org on 2016/02/07 19:26:16 UTC
[13/16] incubator-rya git commit: RYA-32 Improve how metadata and
values are written to Accumulo PCJ tables
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c12f58f4/extras/indexing/src/main/java/mvm/rya/indexing/external/PrecompJoinOptimizer.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/external/PrecompJoinOptimizer.java b/extras/indexing/src/main/java/mvm/rya/indexing/external/PrecompJoinOptimizer.java
index 65a775f..eb0f042 100644
--- a/extras/indexing/src/main/java/mvm/rya/indexing/external/PrecompJoinOptimizer.java
+++ b/extras/indexing/src/main/java/mvm/rya/indexing/external/PrecompJoinOptimizer.java
@@ -8,9 +8,9 @@ package mvm.rya.indexing.external;
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -19,13 +19,11 @@ package mvm.rya.indexing.external;
* under the License.
*/
-
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.Map.Entry;
import java.util.Set;
import mvm.rya.api.RdfCloudTripleStoreConfiguration;
@@ -38,25 +36,22 @@ import mvm.rya.indexing.accumulo.ConfigUtils;
import mvm.rya.indexing.external.QueryVariableNormalizer.VarCollector;
import mvm.rya.indexing.external.tupleSet.AccumuloIndexSet;
import mvm.rya.indexing.external.tupleSet.ExternalTupleSet;
+import mvm.rya.indexing.external.tupleSet.PcjTables;
+import mvm.rya.indexing.external.tupleSet.PcjTables.PcjException;
import mvm.rya.rdftriplestore.inference.DoNotExpandSP;
import mvm.rya.rdftriplestore.utils.FixedStatementPattern;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.TableNotFoundException;
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.Range;
-import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.security.Authorizations;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.Text;
import org.openrdf.query.BindingSet;
import org.openrdf.query.Dataset;
import org.openrdf.query.MalformedQueryException;
import org.openrdf.query.QueryEvaluationException;
+import org.openrdf.query.algebra.BinaryTupleOperator;
import org.openrdf.query.algebra.BindingSetAssignment;
import org.openrdf.query.algebra.Difference;
import org.openrdf.query.algebra.Distinct;
@@ -86,688 +81,783 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
-//optimizer which matches TupleExpressions associated with pre-computed queries
-//to sub-queries of a given query. Each matched sub-query is replaced by an indexing node
-//to delegate that portion of the query to the pre-computed query index
+/**
+ * {@link QueryOptimizer} which matches TupleExpressions associated with
+ * pre-computed queries to sub-queries of a given query. Each matched sub-query
+ * is replaced by an indexing node to delegate that portion of the query to the
+ * pre-computed query index.
+ * <p>
+ *
+ * A query can be broken up into "Join segments", which subsets of the query
+ * joined only by {@link Join} nodes. Any portions of the query that are
+ * attached by {@link BinaryTupleOperator} or {@link UnaryTupleOperator} nodes other
+ * than a Join node mark the beginning of a new Join segment. Pre-computed query
+ * indices, or {@link ExternalTupleset} objects, are compared against the query
+ * nodes in each of its Join segments and replace any nodes which match the
+ * nodes in the ExternalTupleSet's TupleExpr.
+ *
+ */
+
public class PrecompJoinOptimizer implements QueryOptimizer, Configurable {
- private List<ExternalTupleSet> indexSet;
- private Configuration conf;
- private boolean init = false;
-
- public PrecompJoinOptimizer() {
- }
-
- public PrecompJoinOptimizer(Configuration conf) {
- this.conf = conf;
- try {
- indexSet = getAccIndices(conf);
- init = true;
- } catch (MalformedQueryException e) {
- e.printStackTrace();
- } catch (SailException e) {
- e.printStackTrace();
- } catch (QueryEvaluationException e) {
- e.printStackTrace();
- } catch (TableNotFoundException e) {
- e.printStackTrace();
- } catch (AccumuloException e) {
- e.printStackTrace();
- } catch (AccumuloSecurityException e) {
- e.printStackTrace();
- }
- }
-
- public PrecompJoinOptimizer(List<ExternalTupleSet> indices, boolean useOptimalPcj) {
- this.indexSet = indices;
- conf = new Configuration();
- conf.setBoolean(ConfigUtils.USE_OPTIMAL_PCJ, useOptimalPcj);
- }
-
- public void setConf(Configuration conf) {
- this.conf = conf;
- if (!init) {
- try {
- indexSet = getAccIndices(conf);
- init = true;
- } catch (MalformedQueryException e) {
- e.printStackTrace();
- } catch (SailException e) {
- e.printStackTrace();
- } catch (QueryEvaluationException e) {
- e.printStackTrace();
- } catch (TableNotFoundException e) {
- e.printStackTrace();
- } catch (AccumuloException e) {
- e.printStackTrace();
- } catch (AccumuloSecurityException e) {
- e.printStackTrace();
- }
- }
- }
-
- @Override
- public Configuration getConf() {
- return conf;
- }
-
-
- @Override
- public void optimize(TupleExpr tupleExpr, Dataset dataset, BindingSet bindings) {
-
- IndexedExecutionPlanGenerator iep = new IndexedExecutionPlanGenerator(tupleExpr, indexSet);
- JoinVisitor jv = new JoinVisitor();
-
- if (ConfigUtils.getUseOptimalPCJ(conf) && indexSet.size() > 0) {
-
- //get potential relevant index combinations
- ValidIndexCombinationGenerator vic = new ValidIndexCombinationGenerator(tupleExpr);
- Iterator<List<ExternalTupleSet>> iter = vic.getValidIndexCombos(iep.getNormalizedIndices());
- TupleExpr bestTup = null;
- TupleExpr tempTup = null;
- double tempCost = 0;
- double minCost = Double.MAX_VALUE;
-
- while (iter.hasNext()) {
- //apply join visitor to place external index nodes in query
- TupleExpr clone = tupleExpr.clone();
- jv.setExternalTupList(iter.next());
- jv.setSegmentFilters(new ArrayList<Filter>());
- clone.visit(jv);
-
- //get all valid execution plans for given external index combination by considering all
- //permutations of nodes in TupleExpr
- IndexPlanValidator ipv = new IndexPlanValidator(false);
- Iterator<TupleExpr> validTups = ipv.getValidTuples(TupleReArranger.getTupleReOrderings(clone).iterator());
-
- //set valid plan according to a specified cost threshold, where cost depends on specified weights
- //for number of external index nodes, common variables among joins in execution plan, and number of
- //external products in execution plan
- ThreshholdPlanSelector tps = new ThreshholdPlanSelector(tupleExpr);
- tempTup = tps.getThreshholdQueryPlan(validTups, .4, .5, .2, .3);
-
- //choose best threshhold TupleExpr among all index node combinations
- tempCost = tps.getCost(tempTup, .5, .2, .3);
- if(tempCost < minCost ) {
- minCost = tempCost;
- bestTup = tempTup;
- }
- }
- if (bestTup != null) {
- ((UnaryTupleOperator) tupleExpr).setArg(((UnaryTupleOperator) bestTup).getArg());
- }
- return;
- } else {
- if (indexSet.size() > 0) {
- jv.setExternalTupList(iep.getNormalizedIndices());
- tupleExpr.visit(jv);
- }
- return;
- }
- }
-
- protected class JoinVisitor extends QueryModelVisitorBase<RuntimeException> {
-
- private List<ExternalTupleSet> tupList;
- private List<Filter> segmentFilters = Lists.newArrayList();
-
- public void setExternalTupList(List<ExternalTupleSet> tupList) {
- this.tupList = tupList;
- }
-
- public void setSegmentFilters(List<Filter> segmentFilters) {
- this.segmentFilters = segmentFilters;
- }
-
- @Override
- public void meet(Join node) {
-
- //get all filters with bindings in this segment
- updateFilters(segmentFilters, true);
-
- try {
- if (node.getLeftArg() instanceof FixedStatementPattern && node.getRightArg() instanceof DoNotExpandSP) {
- return;
- }
-
- //get nodes in this join segment
- TupleExpr newJoin = null;
- List<QueryModelNode> args = getJoinArgs(node, new ArrayList<QueryModelNode>(), false);
- List<TupleExpr> joinArgs = Lists.newArrayList();
-
- for (QueryModelNode qNode : args) {
- assert (qNode instanceof TupleExpr);
- joinArgs.add((TupleExpr) qNode);
- }
-
- //insert all matching ExternalTupleSets in tupList into this segment
- joinArgs = matchExternalTupleSets(joinArgs, tupList);
-
- //push down any filters that have bindings in lower segments
- //and update the filters in this segment
- updateFilters(segmentFilters, false);
-
- //form join from matching ExternalTupleSets, remaining nodes, and filters
- //that can't be pushed down any further
- newJoin = getNewJoin(joinArgs, getFilterChain(segmentFilters));
-
- // Replace old join hierarchy
- node.replaceWith(newJoin);
-
- //visit remaining nodes to match ExternalTupleSets with nodes further down
- for (TupleExpr te : joinArgs) {
- if (!(te instanceof StatementPattern) && !(te instanceof ExternalTupleSet)) {
- segmentFilters = Lists.newArrayList();
- te.visit(this);
- }
- }
-
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
-
-
- @Override
- public void meet(Filter node) {
- segmentFilters.add(node);
- node.getArg().visit(this);
- }
-
- //chain filters together and return front and back of chain
- private List<TupleExpr> getFilterChain(List<Filter> filters) {
- List<TupleExpr> filterTopBottom = Lists.newArrayList();
- Filter filterChainTop = null;
- Filter filterChainBottom = null;
-
- for (Filter filter: filters) {
- if (filterChainTop == null) {
- filterChainTop = filter;
- } else if (filterChainBottom == null) {
- filterChainBottom = filter;
- filterChainTop.setArg(filterChainBottom);
- } else {
- filterChainBottom.setArg(filter);
- filterChainBottom = filter;
- }
- }
- if(filterChainTop != null) {
- filterTopBottom.add(filterChainTop);
- }
- if(filterChainBottom != null) {
- filterTopBottom.add(filterChainBottom);
- }
- return filterTopBottom;
- }
-
- //build newJoin node given remaining joinArgs and chain of filters
- private TupleExpr getNewJoin(List<TupleExpr> args, List<TupleExpr> filterChain) {
- TupleExpr newJoin;
- List<TupleExpr> joinArgs = Lists.newArrayList(args);
-
- if (joinArgs.size() > 1) {
- if (filterChain.size() > 0) {
- TupleExpr finalJoinArg = joinArgs.remove(0);
- TupleExpr tempJoin;
- TupleExpr temp = filterChain.get(0);
-
- if (joinArgs.size() > 1) {
- tempJoin = new Join(joinArgs.remove(0), joinArgs.remove(0));
- for (TupleExpr te : joinArgs) {
- tempJoin = new Join(tempJoin, te);
- }
- } else {
- tempJoin = joinArgs.remove(0);
- }
-
- if (filterChain.size() == 1) {
- ((Filter) temp).setArg(tempJoin);
- } else {
- ((Filter) filterChain.get(1)).setArg(tempJoin);
- }
- newJoin = new Join(temp, finalJoinArg);
- } else {
- newJoin = new Join(joinArgs.get(0), joinArgs.get(1));
- joinArgs.remove(0);
- joinArgs.remove(0);
-
- for (TupleExpr te : joinArgs) {
- newJoin = new Join(newJoin, te);
- }
- }
- } else if (joinArgs.size() == 1) {
- if (filterChain.size() > 0) {
- newJoin = filterChain.get(0);
- if (filterChain.size() == 1) {
- ((Filter) newJoin).setArg(joinArgs.get(0));
- } else {
- ((Filter) filterChain.get(1)).setArg(joinArgs.get(0));
- }
- } else {
- newJoin = joinArgs.get(0);
- }
- } else {
- throw new IllegalStateException("JoinArgs size cannot be zero.");
- }
- return newJoin;
- }
-
-
- private List<TupleExpr> matchExternalTupleSets(List<TupleExpr> joinArgs, List<ExternalTupleSet> tupList) {
-
- Set<QueryModelNode> argSet = Sets.newHashSet();
- argSet.addAll(joinArgs);
-
- if(argSet.size() < joinArgs.size()) {
- throw new IllegalArgumentException("Query has duplicate nodes in segment!");
- }
-
- Set<QueryModelNode> firstJoinFilterCond = Sets.newHashSet();
-
- for(Filter filter: segmentFilters) {
- firstJoinFilterCond.add(filter.getCondition());
- }
-
- argSet.addAll(firstJoinFilterCond);
-
- //see if ExternalTupleSet nodes are a subset of joinArgs, and if so, replacing matching nodes
- //with ExternalTupleSet
- for (ExternalTupleSet tup : tupList) {
- TupleExpr tupleArg = tup.getTupleExpr();
- if (isTupleValid(tupleArg)) {
- List<QueryModelNode> tupJoinArgs = getJoinArgs(tupleArg,
- new ArrayList<QueryModelNode>(), true);
- Set<QueryModelNode> tupJoinArgSet = Sets.newHashSet(tupJoinArgs);
- if(tupJoinArgSet.size() < tupJoinArgs.size()) {
- throw new IllegalArgumentException("ExternalTuple contains duplicate nodes!");
- }
- if (argSet.containsAll(tupJoinArgSet)) {
- argSet = Sets.newHashSet(Sets.difference(argSet, tupJoinArgSet));
- argSet.add((ExternalTupleSet) tup.clone());
- }
- }
- }
-
- //update segment filters by removing those use in ExternalTupleSet
- Iterator<Filter> iter = segmentFilters.iterator();
-
- while(iter.hasNext()) {
- Filter filt = iter.next();
- if(!argSet.contains(filt.getCondition())) {
- filt.replaceWith(filt.getArg());
- iter.remove();
- }
- }
-
- //update joinArgs
- joinArgs = Lists.newArrayList();
- for(QueryModelNode node: argSet) {
- if(!(node instanceof ValueExpr)) {
- joinArgs.add((TupleExpr)node);
- }
- }
-
- return joinArgs;
- }
-
-
- private void updateFilters(List<Filter> filters, boolean firstJoin) {
-
- Iterator<Filter> iter = segmentFilters.iterator();
-
- while (iter.hasNext()) {
- if (!FilterRelocator.relocate(iter.next(), firstJoin)) {
- iter.remove();
- }
- }
- }
-
- protected List<QueryModelNode> getJoinArgs(TupleExpr tupleExpr, List<QueryModelNode> joinArgs, boolean getFilters) {
- if (tupleExpr instanceof Join) {
- if (!(((Join) tupleExpr).getLeftArg() instanceof FixedStatementPattern)
- && !(((Join) tupleExpr).getRightArg() instanceof DoNotExpandSP)) {
- Join join = (Join) tupleExpr;
- getJoinArgs(join.getLeftArg(), joinArgs, getFilters);
- getJoinArgs(join.getRightArg(), joinArgs, getFilters);
- }
- } else if(tupleExpr instanceof Filter) {
- if (getFilters) {
- joinArgs.add(((Filter) tupleExpr).getCondition());
- }
- getJoinArgs(((Filter)tupleExpr).getArg(), joinArgs, getFilters);
- } else if(tupleExpr instanceof Projection) {
- getJoinArgs(((Projection)tupleExpr).getArg(), joinArgs, getFilters);
- } else {
- joinArgs.add(tupleExpr);
- }
-
- return joinArgs;
- }
- }
-
- protected static class FilterRelocator extends QueryModelVisitorBase<RuntimeException> {
-
-
- protected final Filter filter;
-
- protected final Set<String> filterVars;
- private boolean stopAtFirstJoin = false;
- private boolean isFirstJoinFilter = false;
- private boolean inSegment = true;
-
-
- public FilterRelocator(Filter filter) {
- this.filter = filter;
- filterVars = VarNameCollector.process(filter.getCondition());
- }
-
- public FilterRelocator(Filter filter, boolean stopAtFirstJoin) {
- this.filter = filter;
- filterVars = VarNameCollector.process(filter.getCondition());
- this.stopAtFirstJoin = stopAtFirstJoin;
- }
-
- public static boolean relocate(Filter filter) {
- FilterRelocator fr = new FilterRelocator(filter);
- filter.visit(fr);
- return fr.inSegment;
- }
-
- public static boolean relocate(Filter filter, boolean stopAtFirstJoin) {
- if (stopAtFirstJoin) {
- FilterRelocator fr = new FilterRelocator(filter, stopAtFirstJoin);
- filter.visit(fr);
- return fr.isFirstJoinFilter;
- } else {
- FilterRelocator fr = new FilterRelocator(filter);
- filter.visit(fr);
- return fr.inSegment;
- }
- }
-
-
- @Override
- protected void meetNode(QueryModelNode node) {
- // By default, do not traverse
- assert node instanceof TupleExpr;
-
- if(node instanceof UnaryTupleOperator) {
- if (((UnaryTupleOperator)node).getArg().getBindingNames().containsAll(filterVars)) {
- if (stopAtFirstJoin) {
- ((UnaryTupleOperator) node).getArg().visit(this);
- } else {
- inSegment = false;
- relocate(filter, ((UnaryTupleOperator) node).getArg());
- }
- }
- }
-
- relocate(filter, (TupleExpr) node);
- }
-
-
- @Override
- public void meet(Join join) {
-
- if (stopAtFirstJoin) {
- isFirstJoinFilter = true;
- relocate(filter, join);
- } else {
-
- if (join.getLeftArg().getBindingNames().containsAll(filterVars)) {
- // All required vars are bound by the left expr
- join.getLeftArg().visit(this);
- } else if (join.getRightArg().getBindingNames().containsAll(filterVars)) {
- // All required vars are bound by the right expr
- join.getRightArg().visit(this);
- } else {
- relocate(filter, join);
- }
- }
- }
-
- @Override
- public void meet(LeftJoin leftJoin) {
-
- if (leftJoin.getLeftArg().getBindingNames().containsAll(filterVars)) {
- inSegment = false;
- if (stopAtFirstJoin) {
- leftJoin.getLeftArg().visit(this);
- } else {
- relocate(filter, leftJoin.getLeftArg());
- }
- }
- else {
- relocate(filter, leftJoin);
- }
- }
-
- @Override
- public void meet(Union union) {
- Filter clone = new Filter();
- clone.setCondition(filter.getCondition().clone());
-
- relocate(filter, union.getLeftArg());
- relocate(clone, union.getRightArg());
-
- inSegment = false;
-
- }
-
- @Override
- public void meet(Difference node) {
- Filter clone = new Filter();
- clone.setCondition(filter.getCondition().clone());
-
- relocate(filter, node.getLeftArg());
- relocate(clone, node.getRightArg());
-
- inSegment = false;
-
- }
-
- @Override
- public void meet(Intersection node) {
- Filter clone = new Filter();
- clone.setCondition(filter.getCondition().clone());
-
- relocate(filter, node.getLeftArg());
- relocate(clone, node.getRightArg());
-
- inSegment = false;
-
- }
-
- @Override
- public void meet(Extension node) {
- if (node.getArg().getBindingNames().containsAll(filterVars)) {
- if (stopAtFirstJoin) {
- node.getArg().visit(this);
- } else {
- relocate(filter, node.getArg());
- inSegment = false;
- }
- }
- else {
- relocate(filter, node);
- }
- }
-
- @Override
- public void meet(EmptySet node) {
- if (filter.getParentNode() != null) {
- // Remove filter from its original location
- filter.replaceWith(filter.getArg());
- }
- }
-
- @Override
- public void meet(Filter filter) {
- // Filters are commutative
- filter.getArg().visit(this);
- }
-
- @Override
- public void meet(Distinct node) {
- node.getArg().visit(this);
- }
-
- @Override
- public void meet(Order node) {
- node.getArg().visit(this);
- }
-
- @Override
- public void meet(QueryRoot node) {
- node.getArg().visit(this);
- }
-
- @Override
- public void meet(Reduced node) {
- node.getArg().visit(this);
- }
-
- protected void relocate(Filter filter, TupleExpr newFilterArg) {
- if (filter.getArg() != newFilterArg) {
- if (filter.getParentNode() != null) {
- // Remove filter from its original location
- filter.replaceWith(filter.getArg());
- }
-
- // Insert filter at the new location
- newFilterArg.replaceWith(filter);
- filter.setArg(newFilterArg);
- }
- }
- }
-
-
- private static boolean isTupleValid(QueryModelNode node) {
-
- ValidQueryVisitor vqv = new ValidQueryVisitor();
- node.visit(vqv);
-
- if (vqv.isValid() && vqv.getSPs().size() > 1) {
- if(vqv.getFilters().size() > 0) {
- Set<String> spVars = getVarNames(vqv.getSPs());
- Set<String> fVarNames = getVarNames(vqv.getFilters());
- //check that all vars contained in filters also occur in SPs
- return Sets.intersection(fVarNames,spVars).equals(fVarNames);
- } else {
- return true;
- }
- } else {
- return false;
- }
- }
-
-
- private static Set<String> getVarNames(Collection<QueryModelNode> nodes) {
-
- List<String> tempVars;
- Set<String> nodeVarNames = Sets.newHashSet();
-
- for (QueryModelNode s : nodes) {
- tempVars = VarCollector.process(s);
- for (String t : tempVars)
- nodeVarNames.add(t);
- }
- return nodeVarNames;
- }
-
-
- private static class ValidQueryVisitor extends QueryModelVisitorBase<RuntimeException> {
-
- private boolean isValid = true;
- private Set<QueryModelNode> filterSet = Sets.newHashSet();
- private Set<QueryModelNode> spSet = Sets.newHashSet();
-
- public Set<QueryModelNode> getFilters() {
- return filterSet;
- }
-
- public Set<QueryModelNode> getSPs() {
- return spSet;
- }
-
- public boolean isValid() {
- return isValid;
- }
-
- public void meet(Projection node) {
- node.getArg().visit(this);
- }
-
- @Override
- public void meet(Filter node) {
- filterSet.add(node.getCondition());
- node.getArg().visit(this);
- }
-
- @Override
- public void meet(StatementPattern node) {
- spSet.add(node);
- }
-
- public void meetNode(QueryModelNode node) {
-
- if (!((node instanceof Join) || (node instanceof StatementPattern) || (node instanceof BindingSetAssignment) ||
- (node instanceof Var) || (node instanceof Union) || (node instanceof LeftJoin))) {
- isValid = false;
- return;
-
- } else{
- super.meetNode(node);
- }
- }
-
- }
-
-
- private static List<ExternalTupleSet> getAccIndices(Configuration conf) throws MalformedQueryException,
- SailException, QueryEvaluationException, TableNotFoundException, AccumuloException,
- AccumuloSecurityException {
-
- List<String> tables = null;
-
- if (conf instanceof RdfCloudTripleStoreConfiguration) {
- tables = ((RdfCloudTripleStoreConfiguration) conf).getPcjTables();
- }
-
- String tablePrefix = conf.get(RdfCloudTripleStoreConfiguration.CONF_TBL_PREFIX);
- Connector c = ConfigUtils.getConnector(conf);
- Map<String, String> indexTables = Maps.newLinkedHashMap();
-
- if (tables != null && !tables.isEmpty()) {
- for (String table : tables) {
- Scanner s = c.createScanner(table, new Authorizations());
- s.setRange(Range.exact(new Text("~SPARQL")));
- for (Entry<Key, Value> e : s) {
- indexTables.put(table, e.getValue().toString());
- }
- }
- } else {
- for (String table : c.tableOperations().list()) {
- if (table.startsWith(tablePrefix + "INDEX")) {
- Scanner s = c.createScanner(table, new Authorizations());
- s.setRange(Range.exact(new Text("~SPARQL")));
- for (Entry<Key, Value> e : s) {
- indexTables.put(table, e.getValue().toString());
- }
- }
- }
-
- }
- List<ExternalTupleSet> index = Lists.newArrayList();
-
- if (indexTables.isEmpty()) {
- System.out.println("No Index found");
- } else {
- for (String table : indexTables.keySet()) {
- String indexSparqlString = indexTables.get(table);
- index.add(new AccumuloIndexSet(indexSparqlString, c, table));
- }
- }
- return index;
- }
+ private List<ExternalTupleSet> indexSet;
+ private Configuration conf;
+ private boolean init = false;
+
+ public PrecompJoinOptimizer() {
+ }
+
+ public PrecompJoinOptimizer(Configuration conf) {
+ this.conf = conf;
+ try {
+ indexSet = getAccIndices(conf);
+ } catch (MalformedQueryException | SailException
+ | QueryEvaluationException | TableNotFoundException
+ | AccumuloException | AccumuloSecurityException | PcjException e) {
+ e.printStackTrace();
+ }
+ init = true;
+ }
+
+ public PrecompJoinOptimizer(List<ExternalTupleSet> indices,
+ boolean useOptimalPcj) {
+ this.indexSet = indices;
+ conf = new Configuration();
+ conf.setBoolean(ConfigUtils.USE_OPTIMAL_PCJ, useOptimalPcj);
+ }
+
+ @Override
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+ if (!init) {
+ try {
+ indexSet = getAccIndices(conf);
+ } catch (MalformedQueryException | SailException
+ | QueryEvaluationException | TableNotFoundException
+ | AccumuloException | AccumuloSecurityException
+ | PcjException e) {
+ e.printStackTrace();
+ }
+ init = true;
+ }
+ }
+
+ @Override
+ public Configuration getConf() {
+ return conf;
+ }
+
+ /**
+ * @param tupleExpr
+ * -- query whose query plan will be optimized -- specified
+ * ExternalTupleSet nodes contained in will be placed in query
+ * plan where an ExternalTupleSet TupleExpr matches the query's
+ * sub-query
+ */
+ @Override
+ public void optimize(TupleExpr tupleExpr, Dataset dataset,
+ BindingSet bindings) {
+
+ final IndexedExecutionPlanGenerator iep = new IndexedExecutionPlanGenerator(
+ tupleExpr, indexSet);
+ final JoinVisitor jv = new JoinVisitor();
+
+ if (ConfigUtils.getUseOptimalPCJ(conf) && indexSet.size() > 0) {
+
+ // get potential relevant index combinations
+ final ValidIndexCombinationGenerator vic = new ValidIndexCombinationGenerator(
+ tupleExpr);
+ final Iterator<List<ExternalTupleSet>> iter = vic
+ .getValidIndexCombos(iep.getNormalizedIndices());
+ TupleExpr bestTup = null;
+ TupleExpr tempTup = null;
+ double tempCost = 0;
+ double minCost = Double.MAX_VALUE;
+
+ while (iter.hasNext()) {
+ // apply join visitor to place external index nodes in query
+ final TupleExpr clone = tupleExpr.clone();
+ jv.setExternalTupList(iter.next());
+ jv.setSegmentFilters(new ArrayList<Filter>());
+ clone.visit(jv);
+
+ // get all valid execution plans for given external index
+ // combination by considering all
+ // permutations of nodes in TupleExpr
+ final IndexPlanValidator ipv = new IndexPlanValidator(false);
+ final Iterator<TupleExpr> validTups = ipv
+ .getValidTuples(TupleReArranger.getTupleReOrderings(
+ clone).iterator());
+
+ // set valid plan according to a specified cost threshold, where
+ // cost depends on specified weights
+ // for number of external index nodes, common variables among
+ // joins in execution plan, and number of
+ // external products in execution plan
+ final ThreshholdPlanSelector tps = new ThreshholdPlanSelector(
+ tupleExpr);
+ tempTup = tps.getThreshholdQueryPlan(validTups, .4, .5, .2, .3);
+
+ // choose best threshhold TupleExpr among all index node
+ // combinations
+ tempCost = tps.getCost(tempTup, .5, .2, .3);
+ if (tempCost < minCost) {
+ minCost = tempCost;
+ bestTup = tempTup;
+ }
+ }
+ if (bestTup != null) {
+ ((UnaryTupleOperator) tupleExpr)
+ .setArg(((UnaryTupleOperator) bestTup).getArg());
+ }
+ return;
+ } else {
+ if (indexSet.size() > 0) {
+ jv.setExternalTupList(iep.getNormalizedIndices());
+ tupleExpr.visit(jv);
+ }
+ return;
+ }
+ }
+
+ /**
+ * Given a list of @ ExternalTuleSet} , this visitor navigates the query
+ * {@link TupleExpr} specified in the
+ * {@link PrecompJoinOptimizer#optimize(TupleExpr, Dataset, BindingSet) and
+ * matches the TupleExpr in the ExternalTupleSet with sub-queries of the
+ * query and replaces the sub-query with the ExternalTupleSet node.
+ *
+ */
+ protected class JoinVisitor extends QueryModelVisitorBase<RuntimeException> {
+
+ private List<ExternalTupleSet> tupList;
+ private List<Filter> segmentFilters = Lists.newArrayList();
+
+ public void setExternalTupList(List<ExternalTupleSet> tupList) {
+ this.tupList = tupList;
+ }
+
+ public void setSegmentFilters(List<Filter> segmentFilters) {
+ this.segmentFilters = segmentFilters;
+ }
+
+ @Override
+ public void meet(Join node) {
+
+ // get all filters with bindings in this segment
+ updateFilters(segmentFilters, true);
+
+ try {
+ if (node.getLeftArg() instanceof FixedStatementPattern
+ && node.getRightArg() instanceof DoNotExpandSP) {
+ return;
+ }
+
+ // get nodes in this join segment
+ TupleExpr newJoin = null;
+ final List<QueryModelNode> args = getJoinArgs(node,
+ new ArrayList<QueryModelNode>(), false);
+ List<TupleExpr> joinArgs = Lists.newArrayList();
+
+ for (final QueryModelNode qNode : args) {
+ assert qNode instanceof TupleExpr;
+ joinArgs.add((TupleExpr) qNode);
+ }
+
+ // insert all matching ExternalTupleSets in tupList into this
+ // segment
+ joinArgs = matchExternalTupleSets(joinArgs, tupList);
+
+ // push down any filters that have bindings in lower segments
+ // and update the filters in this segment
+ updateFilters(segmentFilters, false);
+
+ // form join from matching ExternalTupleSets, remaining nodes,
+ // and filters
+ // that can't be pushed down any further
+ newJoin = getNewJoin(joinArgs, getFilterChain(segmentFilters));
+
+ // Replace old join hierarchy
+ node.replaceWith(newJoin);
+
+ // visit remaining nodes to match ExternalTupleSets with nodes
+ // further down
+ for (final TupleExpr te : joinArgs) {
+ if (!(te instanceof StatementPattern)
+ && !(te instanceof ExternalTupleSet)) {
+ segmentFilters = Lists.newArrayList();
+ te.visit(this);
+ }
+ }
+
+ } catch (final Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Override
+ public void meet(Filter node) {
+ segmentFilters.add(node);
+ node.getArg().visit(this);
+ }
+
+ // chain filters together and return front and back of chain
+ private List<TupleExpr> getFilterChain(List<Filter> filters) {
+ final List<TupleExpr> filterTopBottom = Lists.newArrayList();
+ Filter filterChainTop = null;
+ Filter filterChainBottom = null;
+
+ for (final Filter filter : filters) {
+ filter.replaceWith(filter.getArg());
+ if (filterChainTop == null) {
+ filterChainTop = filter;
+ } else if (filterChainBottom == null) {
+ filterChainBottom = filter;
+ filterChainTop.setArg(filterChainBottom);
+ } else {
+ filterChainBottom.setArg(filter);
+ filterChainBottom = filter;
+ }
+ }
+ if (filterChainTop != null) {
+ filterTopBottom.add(filterChainTop);
+ }
+ if (filterChainBottom != null) {
+ filterTopBottom.add(filterChainBottom);
+ }
+ return filterTopBottom;
+ }
+
+ // build newJoin node given remaining joinArgs and chain of filters
+ private TupleExpr getNewJoin(List<TupleExpr> args,
+ List<TupleExpr> filterChain) {
+ TupleExpr newJoin;
+ final List<TupleExpr> joinArgs = Lists.newArrayList(args);
+
+ if (joinArgs.size() > 1) {
+ if (filterChain.size() > 0) {
+ final TupleExpr finalJoinArg = joinArgs.remove(0);
+ TupleExpr tempJoin;
+ final TupleExpr temp = filterChain.get(0);
+
+ if (joinArgs.size() > 1) {
+ tempJoin = new Join(joinArgs.remove(0),
+ joinArgs.remove(0));
+ for (final TupleExpr te : joinArgs) {
+ tempJoin = new Join(tempJoin, te);
+ }
+ } else {
+ tempJoin = joinArgs.remove(0);
+ }
+
+ if (filterChain.size() == 1) {
+ ((Filter) temp).setArg(tempJoin);
+ } else {
+ ((Filter) filterChain.get(1)).setArg(tempJoin);
+ }
+ newJoin = new Join(temp, finalJoinArg);
+ } else {
+ newJoin = new Join(joinArgs.get(0), joinArgs.get(1));
+ joinArgs.remove(0);
+ joinArgs.remove(0);
+
+ for (final TupleExpr te : joinArgs) {
+ newJoin = new Join(newJoin, te);
+ }
+ }
+ } else if (joinArgs.size() == 1) {
+ if (filterChain.size() > 0) {
+ newJoin = filterChain.get(0);
+ if (filterChain.size() == 1) {
+ ((Filter) newJoin).setArg(joinArgs.get(0));
+ } else {
+ ((Filter) filterChain.get(1)).setArg(joinArgs.get(0));
+ }
+ } else {
+ newJoin = joinArgs.get(0);
+ }
+ } else {
+ throw new IllegalStateException("JoinArgs size cannot be zero.");
+ }
+ return newJoin;
+ }
+
+ /**
+ *
+ * @param joinArgs
+ * -- list of non-join nodes contained in the join segment
+ * @param tupList
+ * -- list of indices to match sub-queries in this join
+ * segment
+ * @return updated list of non-join nodes, where any nodes matching an
+ * index are replaced by that index
+ */
+ private List<TupleExpr> matchExternalTupleSets(
+ List<TupleExpr> joinArgs, List<ExternalTupleSet> tupList) {
+
+ List<TupleExpr> bsaList = new ArrayList<>();
+ Set<QueryModelNode> argSet = Sets.newHashSet();
+ for (TupleExpr te : joinArgs) {
+ if (te instanceof BindingSetAssignment) {
+ bsaList.add(te);
+ } else {
+ argSet.add(te);
+ }
+ }
+
+ if (argSet.size() + bsaList.size() < joinArgs.size()) {
+ throw new IllegalArgumentException(
+ "Query has duplicate nodes in segment!");
+ }
+
+ final Set<QueryModelNode> firstJoinFilterCond = Sets.newHashSet();
+
+ for (final Filter filter : segmentFilters) {
+ firstJoinFilterCond.add(filter.getCondition());
+ }
+
+ argSet.addAll(firstJoinFilterCond);
+
+ // see if ExternalTupleSet nodes are a subset of joinArgs, and if
+ // so, replacing matching nodes
+ // with ExternalTupleSet
+ for (final ExternalTupleSet tup : tupList) {
+ final TupleExpr tupleArg = tup.getTupleExpr();
+ if (isTupleValid(tupleArg)) {
+ final List<QueryModelNode> tupJoinArgs = getJoinArgs(
+ tupleArg, new ArrayList<QueryModelNode>(), true);
+ final Set<QueryModelNode> tupJoinArgSet = Sets
+ .newHashSet(tupJoinArgs);
+ if (tupJoinArgSet.size() < tupJoinArgs.size()) {
+ throw new IllegalArgumentException(
+ "ExternalTuple contains duplicate nodes!");
+ }
+ if (argSet.containsAll(tupJoinArgSet)) {
+ argSet = Sets.newHashSet(Sets.difference(argSet,
+ tupJoinArgSet));
+ argSet.add(tup.clone());
+ }
+ }
+ }
+
+ // update segment filters by removing those use in ExternalTupleSet
+ final Iterator<Filter> iter = segmentFilters.iterator();
+
+ while (iter.hasNext()) {
+ final Filter filt = iter.next();
+ if (!argSet.contains(filt.getCondition())) {
+ filt.replaceWith(filt.getArg());
+ iter.remove();
+ }
+ }
+
+ // update joinArgs
+ joinArgs = Lists.newArrayList();
+ for (final QueryModelNode node : argSet) {
+ if (!(node instanceof ValueExpr)) {
+ joinArgs.add((TupleExpr) node);
+ }
+ }
+ joinArgs.addAll(bsaList);
+
+ return joinArgs;
+ }
+
+ private void updateFilters(List<Filter> filters, boolean firstJoin) {
+
+ final Iterator<Filter> iter = segmentFilters.iterator();
+
+ while (iter.hasNext()) {
+ if (!FilterRelocator.relocate(iter.next(), firstJoin)) {
+ iter.remove();
+ }
+ }
+ }
+
+ /**
+ *
+ * @param tupleExpr
+ * -- the query
+ * @param joinArgs
+ * -- the non-join nodes contained in the join segment
+ * @param getFilters
+ * -- the filters contained in the query
+ * @return -- the non-join nodes contained in the join segment
+ */
+ protected List<QueryModelNode> getJoinArgs(TupleExpr tupleExpr,
+ List<QueryModelNode> joinArgs, boolean getFilters) {
+ if (tupleExpr instanceof Join) {
+ if (!(((Join) tupleExpr).getLeftArg() instanceof FixedStatementPattern)
+ && !(((Join) tupleExpr).getRightArg() instanceof DoNotExpandSP)) {
+ final Join join = (Join) tupleExpr;
+ getJoinArgs(join.getLeftArg(), joinArgs, getFilters);
+ getJoinArgs(join.getRightArg(), joinArgs, getFilters);
+ }
+ } else if (tupleExpr instanceof Filter) {
+ if (getFilters) {
+ joinArgs.add(((Filter) tupleExpr).getCondition());
+ }
+ getJoinArgs(((Filter) tupleExpr).getArg(), joinArgs, getFilters);
+ } else if (tupleExpr instanceof Projection) {
+ getJoinArgs(((Projection) tupleExpr).getArg(), joinArgs,
+ getFilters);
+ } else {
+ joinArgs.add(tupleExpr);
+ }
+
+ return joinArgs;
+ }
+ }
+
+ /**
+ * Relocates filters based on the binding variables contained in the
+ * {@link Filter}. If you don't specify the FilterRelocator to stop at the
+ * first {@link Join}, the relocator pushes the filter as far down the query
+ * plan as possible, checking if the nodes below contain its binding
+ * variables. If stopAtFirstJoin = true, the Filter is inserted at the first
+ * Join node encountered. The relocator tracks whether the node stays in the
+ * join segment or is inserted outside of the Join segment and returns true
+ * if the Filter stays in the segment and false otherwise.
+ *
+ */
+
+ protected static class FilterRelocator extends
+ QueryModelVisitorBase<RuntimeException> {
+
+ protected final Filter filter;
+
+ protected final Set<String> filterVars;
+ private boolean stopAtFirstJoin = false;
+ private boolean isFirstJoinFilter = false;
+ private boolean inSegment = true;
+
+ public FilterRelocator(Filter filter) {
+ this.filter = filter;
+ filterVars = VarNameCollector.process(filter.getCondition());
+ }
+
+ public FilterRelocator(Filter filter, boolean stopAtFirstJoin) {
+ this.filter = filter;
+ filterVars = VarNameCollector.process(filter.getCondition());
+ this.stopAtFirstJoin = stopAtFirstJoin;
+ }
+
+ public static boolean relocate(Filter filter) {
+ final FilterRelocator fr = new FilterRelocator(filter);
+ filter.visit(fr);
+ return fr.inSegment;
+ }
+
+ public static boolean relocate(Filter filter, boolean stopAtFirstJoin) {
+ if (stopAtFirstJoin) {
+ final FilterRelocator fr = new FilterRelocator(filter,
+ stopAtFirstJoin);
+ filter.visit(fr);
+ return fr.isFirstJoinFilter;
+ } else {
+ final FilterRelocator fr = new FilterRelocator(filter);
+ filter.visit(fr);
+ return fr.inSegment;
+ }
+ }
+
+ @Override
+ protected void meetNode(QueryModelNode node) {
+ // By default, do not traverse
+ assert node instanceof TupleExpr;
+
+ if (node instanceof UnaryTupleOperator) {
+ if (((UnaryTupleOperator) node).getArg().getBindingNames()
+ .containsAll(filterVars)) {
+ if (stopAtFirstJoin) {
+ ((UnaryTupleOperator) node).getArg().visit(this);
+ } else {
+ inSegment = false;
+ relocate(filter, ((UnaryTupleOperator) node).getArg());
+ }
+ }
+ }
+
+ relocate(filter, (TupleExpr) node);
+ }
+
+ @Override
+ public void meet(Join join) {
+
+ if (stopAtFirstJoin) {
+ isFirstJoinFilter = true;
+ relocate(filter, join);
+ } else {
+
+ if (join.getLeftArg().getBindingNames().containsAll(filterVars)) {
+ // All required vars are bound by the left expr
+ join.getLeftArg().visit(this);
+ } else if (join.getRightArg().getBindingNames()
+ .containsAll(filterVars)) {
+ // All required vars are bound by the right expr
+ join.getRightArg().visit(this);
+ } else {
+ relocate(filter, join);
+ }
+ }
+ }
+
+ @Override
+ public void meet(LeftJoin leftJoin) {
+
+ if (leftJoin.getLeftArg().getBindingNames().containsAll(filterVars)) {
+ inSegment = false;
+ if (stopAtFirstJoin) {
+ leftJoin.getLeftArg().visit(this);
+ } else {
+ relocate(filter, leftJoin.getLeftArg());
+ }
+ } else {
+ relocate(filter, leftJoin);
+ }
+ }
+
+ @Override
+ public void meet(Union union) {
+ final Filter clone = new Filter();
+ clone.setCondition(filter.getCondition().clone());
+
+ relocate(filter, union.getLeftArg());
+ relocate(clone, union.getRightArg());
+
+ inSegment = false;
+
+ }
+
+ @Override
+ public void meet(Difference node) {
+ final Filter clone = new Filter();
+ clone.setCondition(filter.getCondition().clone());
+
+ relocate(filter, node.getLeftArg());
+ relocate(clone, node.getRightArg());
+
+ inSegment = false;
+
+ }
+
+ @Override
+ public void meet(Intersection node) {
+ final Filter clone = new Filter();
+ clone.setCondition(filter.getCondition().clone());
+
+ relocate(filter, node.getLeftArg());
+ relocate(clone, node.getRightArg());
+
+ inSegment = false;
+
+ }
+
+ @Override
+ public void meet(Extension node) {
+ if (node.getArg().getBindingNames().containsAll(filterVars)) {
+ if (stopAtFirstJoin) {
+ node.getArg().visit(this);
+ } else {
+ relocate(filter, node.getArg());
+ inSegment = false;
+ }
+ } else {
+ relocate(filter, node);
+ }
+ }
+
+ @Override
+ public void meet(EmptySet node) {
+ if (filter.getParentNode() != null) {
+ // Remove filter from its original location
+ filter.replaceWith(filter.getArg());
+ }
+ }
+
+ @Override
+ public void meet(Filter filter) {
+ // Filters are commutative
+ filter.getArg().visit(this);
+ }
+
+ @Override
+ public void meet(Distinct node) {
+ node.getArg().visit(this);
+ }
+
+ @Override
+ public void meet(Order node) {
+ node.getArg().visit(this);
+ }
+
+ @Override
+ public void meet(QueryRoot node) {
+ node.getArg().visit(this);
+ }
+
+ @Override
+ public void meet(Reduced node) {
+ node.getArg().visit(this);
+ }
+
+ protected void relocate(Filter filter, TupleExpr newFilterArg) {
+ if (filter.getArg() != newFilterArg) {
+ if (filter.getParentNode() != null) {
+ // Remove filter from its original location
+ filter.replaceWith(filter.getArg());
+ }
+
+ // Insert filter at the new location
+ newFilterArg.replaceWith(filter);
+ filter.setArg(newFilterArg);
+ }
+ }
+ }
+
+ /**
+ * This method determines whether an index node is valid. Criteria for a
+ * valid node are that is have two or more {@link StatementPattern} nodes or
+ * at least one {@link Filter} and one StatementPattern node. Additionally,
+ * the number of variables in the Filter cannot exceed the number of
+ * variables among all non-Filter nodes in the TupleExpr. Also, this method
+ * calls the {@link ValidQueryVisitor} to determine if the
+ * ExternalTupleSet's TupleExpr contains an invalid node type.
+ *
+ * @param node
+ * -- typically an {@link ExternalTupleSet} index node
+ * @return
+ */
+ private static boolean isTupleValid(QueryModelNode node) {
+
+ final ValidQueryVisitor vqv = new ValidQueryVisitor();
+ node.visit(vqv);
+
+ if (vqv.isValid() && vqv.getSPs().size() + vqv.getFilters().size() > 1) {
+ if (vqv.getFilters().size() > 0) {
+ final Set<String> spVars = getVarNames(vqv.getSPs());
+ final Set<String> fVarNames = getVarNames(vqv.getFilters());
+ // check that all vars contained in filters also occur in SPs
+ return Sets.intersection(fVarNames, spVars).equals(fVarNames);
+ } else {
+ return true;
+ }
+ } else {
+ return false;
+ }
+ }
+
+ private static Set<String> getVarNames(Collection<QueryModelNode> nodes) {
+
+ List<String> tempVars;
+ final Set<String> nodeVarNames = Sets.newHashSet();
+
+ for (final QueryModelNode s : nodes) {
+ tempVars = VarCollector.process(s);
+ for (final String t : tempVars) {
+ nodeVarNames.add(t);
+ }
+ }
+ return nodeVarNames;
+ }
+
+ /**
+ * A visitor which checks a TupleExpr associated with an ExternalTupleSet to
+ * determine whether the TupleExpr contains an invalid node.
+ *
+ */
+ private static class ValidQueryVisitor extends
+ QueryModelVisitorBase<RuntimeException> {
+
+ private boolean isValid = true;
+ private final Set<QueryModelNode> filterSet = Sets.newHashSet();
+ private final Set<QueryModelNode> spSet = Sets.newHashSet();
+
+ public Set<QueryModelNode> getFilters() {
+ return filterSet;
+ }
+
+ public Set<QueryModelNode> getSPs() {
+ return spSet;
+ }
+
+ public boolean isValid() {
+ return isValid;
+ }
+
+ @Override
+ public void meet(Projection node) {
+ node.getArg().visit(this);
+ }
+
+ @Override
+ public void meet(Filter node) {
+ filterSet.add(node.getCondition());
+ node.getArg().visit(this);
+ }
+
+ @Override
+ public void meet(StatementPattern node) {
+ spSet.add(node);
+ }
+
+ @Override
+ public void meetNode(QueryModelNode node) {
+
+ if (!(node instanceof Join || node instanceof StatementPattern
+ || node instanceof BindingSetAssignment
+ || node instanceof Var || node instanceof Union || node instanceof LeftJoin)) {
+ isValid = false;
+ return;
+
+ } else {
+ super.meetNode(node);
+ }
+ }
+
+ }
+
+ private static List<ExternalTupleSet> getAccIndices(Configuration conf)
+ throws MalformedQueryException, SailException,
+ QueryEvaluationException, TableNotFoundException,
+ AccumuloException, AccumuloSecurityException, PcjException {
+
+ List<String> tables = null;
+
+ if (conf instanceof RdfCloudTripleStoreConfiguration) {
+ tables = ((RdfCloudTripleStoreConfiguration) conf).getPcjTables();
+ }
+
+ final String tablePrefix = conf
+ .get(RdfCloudTripleStoreConfiguration.CONF_TBL_PREFIX);
+ final Connector c = ConfigUtils.getConnector(conf);
+ final Map<String, String> indexTables = Maps.newLinkedHashMap();
+ PcjTables pcj = new PcjTables();
+
+ if (tables != null && !tables.isEmpty()) {
+ for (final String table : tables) {
+ indexTables
+ .put(table, pcj.getPcjMetadata(c, table).getSparql());
+ }
+ } else {
+ for (final String table : c.tableOperations().list()) {
+ if (table.startsWith(tablePrefix + "INDEX")) {
+ indexTables.put(table, pcj.getPcjMetadata(c, table)
+ .getSparql());
+ }
+ }
+
+ }
+ final List<ExternalTupleSet> index = Lists.newArrayList();
+
+ if (indexTables.isEmpty()) {
+ System.out.println("No Index found");
+ } else {
+ for (final String table : indexTables.keySet()) {
+ final String indexSparqlString = indexTables.get(table);
+ index.add(new AccumuloIndexSet(indexSparqlString, c, table));
+ }
+ }
+ return index;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c12f58f4/extras/indexing/src/main/java/mvm/rya/indexing/external/tupleSet/AccumuloIndexSet.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/external/tupleSet/AccumuloIndexSet.java b/extras/indexing/src/main/java/mvm/rya/indexing/external/tupleSet/AccumuloIndexSet.java
index dda452d..456c465 100644
--- a/extras/indexing/src/main/java/mvm/rya/indexing/external/tupleSet/AccumuloIndexSet.java
+++ b/extras/indexing/src/main/java/mvm/rya/indexing/external/tupleSet/AccumuloIndexSet.java
@@ -8,9 +8,9 @@ package mvm.rya.indexing.external.tupleSet;
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -19,608 +19,373 @@ package mvm.rya.indexing.external.tupleSet;
* under the License.
*/
-
-
import info.aduna.iteration.CloseableIteration;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.Map.Entry;
import java.util.NoSuchElementException;
import java.util.Set;
-import mvm.rya.accumulo.precompQuery.AccumuloPrecompQueryIndexer;
+import mvm.rya.accumulo.precompQuery.AccumuloPcjQuery;
+import mvm.rya.api.utils.IteratorWrapper;
+import mvm.rya.indexing.PcjQuery;
+import mvm.rya.indexing.external.tupleSet.PcjTables.PcjException;
+import mvm.rya.indexing.external.tupleSet.PcjTables.PcjMetadata;
+import mvm.rya.indexing.external.tupleSet.PcjTables.VariableOrder;
import mvm.rya.rdftriplestore.evaluation.ExternalBatchingIterator;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
-import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.MutationsRejectedException;
-import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.TableNotFoundException;
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.Mutation;
-import org.apache.accumulo.core.data.Range;
-import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.security.Authorizations;
-import org.apache.commons.io.IOUtils;
import org.apache.hadoop.io.Text;
-import org.openrdf.model.Literal;
-import org.openrdf.model.URI;
-import org.openrdf.model.impl.LiteralImpl;
-import org.openrdf.model.impl.URIImpl;
import org.openrdf.query.Binding;
import org.openrdf.query.BindingSet;
import org.openrdf.query.MalformedQueryException;
import org.openrdf.query.QueryEvaluationException;
import org.openrdf.query.algebra.Projection;
-import org.openrdf.query.algebra.QueryModelNode;
-import org.openrdf.query.algebra.StatementPattern;
-import org.openrdf.query.algebra.ValueExpr;
+import org.openrdf.query.algebra.TupleExpr;
import org.openrdf.query.algebra.Var;
import org.openrdf.query.algebra.helpers.QueryModelVisitorBase;
-import org.openrdf.query.impl.EmptyBindingSet;
import org.openrdf.query.parser.ParsedTupleQuery;
import org.openrdf.query.parser.sparql.SPARQLParser;
-import org.openrdf.repository.sail.SailRepositoryConnection;
import org.openrdf.sail.SailException;
-import com.beust.jcommander.internal.Sets;
import com.google.common.base.Joiner;
-import com.google.common.collect.BiMap;
import com.google.common.collect.HashBiMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
-
+import com.google.common.collect.Sets;
+
+/**
+ * During query planning, this node is inserted into the parsed query to
+ * represent part of the original query (a sub-query). This sub-query is the
+ * value returned by {@link ExternalTupleSet#getTupleExpr()}. The results
+ * associated with this sub-query are stored in an external Accumulo table,
+ * where accCon and tablename are the associated {@link Connector} and table
+ * name. During evaluation, the portion of the query in
+ * {@link AccumuloIndexSet} is evaluated by scanning the external Accumulo
+ * table. This class is extremely useful for caching queries and reusing results
+ * from previous SPARQL queries.
+ * <p>
+ *
+ * The the {@link TupleExpr} returned by {@link ExternalTupleSet#getTupleExpr()}
+ * may have different variables than the query and variables stored in the
+ * external Accumulo table. The mapping of variables from the TupleExpr to the
+ * table variables are given by {@link ExternalTupleSet#getTableVarMap()}. In
+ * addition to allowing the variables to differ, it is possible for TupleExpr to
+ * have fewer variables than the table query--that is, some of the variables in
+ * the table query may appear as constants in the TupleExpr. Theses expression
+ * are extracted from TupleExpr by the methods
+ * {@link AccumuloIndexSet#getConstantConstraints()} and by the Visitor
+ * {@link ValueMapVisitor} to be used as constraints when scanning the Accumulo
+ * table. This allows for the pre-computed results to be used for a larger class
+ * of sub-queries.
+ *
+ */
public class AccumuloIndexSet extends ExternalTupleSet implements ExternalBatchingIterator {
- private static final int WRITER_MAX_WRITE_THREADS = 30;
- private static final long WRITER_MAX_LATNECY = Long.MAX_VALUE;
- private static final long WRITER_MAX_MEMORY = 500L * 1024L * 1024L;
- private Map<String,AccValueFactory> bindings;
- private List<String> bindingslist;
- private final Connector accCon;
- private final String tablename;
- private long tableSize = 0;
- private List<String> varOrder = null;
-
-
- public static interface AccValueFactory {
- public org.openrdf.model.Value create(String str);
-
- public String create(org.openrdf.model.Value val);
- }
-
- public static class AccUrlFactory implements AccValueFactory {
- @Override
- public org.openrdf.model.Value create(final String str) {
- return new URIImpl(str);
- }
+ private final Connector accCon; //connector to Accumulo table where results are stored
+ private final String tablename; //name of Accumulo table
+ private List<String> varOrder = null; // orders in which results are written to table
+ private PcjTables pcj = new PcjTables();
- @Override
- public String create(org.openrdf.model.Value val) {
- return val.stringValue();
- }
+ @Override
+ public Map<String, Set<String>> getSupportedVariableOrders() {
+ return this.getSupportedVariableOrderMap();
}
-
- public static class AccValueFactoryImpl implements AccValueFactory {
- @Override
- public org.openrdf.model.Value create(final String str) {
- String[] split = str.split("\u0001");
- if (split.length > 1 && split[1].equals("1")) {
- return new URIImpl(split[0]);
- }
- if (split[0].contains(":")) {
- return new URIImpl(split[0]);
- }
- return new LiteralImpl(split[0]);
- }
- @Override
- public String create(org.openrdf.model.Value val) {
- if (val instanceof URI) {
- return val.stringValue() + "\u0001" + 1;
- }
- if (val instanceof Literal) {
- Literal v = (Literal) val;
- return v.getLabel() + "\u0001" + 2;
- }
- return null;
- }
+ @Override
+ public String getSignature() {
+ return "AccumuloIndexSet(" + tablename + ") : " + Joiner.on(", ").join(this.getTupleExpr().getAssuredBindingNames());
}
-
- //TODO set supportedVarOrderMap
- public AccumuloIndexSet(String sparql, SailRepositoryConnection conn, Connector accCon, String tablename) throws MalformedQueryException, SailException,
- QueryEvaluationException, MutationsRejectedException, TableNotFoundException {
- super(null);
+ /**
+ *
+ * @param sparql - name of sparql query whose results will be stored in PCJ table
+ * @param accCon - connection to a valid Accumulo instance
+ * @param tablename - name of an existing PCJ table
+ * @throws MalformedQueryException
+ * @throws SailException
+ * @throws QueryEvaluationException
+ * @throws MutationsRejectedException
+ * @throws TableNotFoundException
+ */
+ public AccumuloIndexSet(String sparql, Connector accCon, String tablename) throws MalformedQueryException, SailException, QueryEvaluationException,
+ MutationsRejectedException, TableNotFoundException {
this.tablename = tablename;
this.accCon = accCon;
- SPARQLParser sp = new SPARQLParser();
- ParsedTupleQuery pq = (ParsedTupleQuery) sp.parseQuery(sparql, null);
-
+ final SPARQLParser sp = new SPARQLParser();
+ final ParsedTupleQuery pq = (ParsedTupleQuery) sp.parseQuery(sparql, null);
setProjectionExpr((Projection) pq.getTupleExpr());
- CloseableIteration<BindingSet,QueryEvaluationException> iter = (CloseableIteration<BindingSet,QueryEvaluationException>) conn.getSailConnection()
- .evaluate(getTupleExpr(), null, new EmptyBindingSet(), false);
-
- BatchWriter w = accCon.createBatchWriter(tablename, WRITER_MAX_MEMORY, WRITER_MAX_LATNECY, WRITER_MAX_WRITE_THREADS);
- this.bindingslist = Lists.newArrayList(pq.getTupleExpr().getAssuredBindingNames());
-
- this.bindings = Maps.newHashMap();
-
- pq.getTupleExpr().visit(new QueryModelVisitorBase<RuntimeException>() {
- @Override
- public void meet(Var node) {
- QueryModelNode parent = node.getParentNode();
- if (parent instanceof StatementPattern) {
- StatementPattern statement = (StatementPattern) parent;
- if (node.equals(statement.getSubjectVar())) {
- bindings.put(node.getName(), new AccUrlFactory());
- }
- if (node.equals(statement.getPredicateVar())) {
- bindings.put(node.getName(), new AccUrlFactory());
- }
- if (node.equals(statement.getObjectVar())) {
- bindings.put(node.getName(), new AccValueFactoryImpl());
- }
- if (node.equals(statement.getContextVar())) {
- // TODO is this correct?
- bindings.put(node.getName(), new AccUrlFactory());
- }
- } else if(parent instanceof ValueExpr) {
- bindings.put(node.getName(), new AccValueFactoryImpl());
- }
- };
- });
-
-
-
-
-
- varOrder = new ArrayList<String>(bindingslist.size());
-
- while (iter.hasNext()) {
-
- BindingSet bs = iter.next();
- List<String> shiftBindingList = null;
- for (int j = 0; j < bindingslist.size(); j++) {
- StringBuffer sb = new StringBuffer();
- shiftBindingList = listShift(bindingslist, j); //TODO calling this each time not efficient
- String order = "";
- for (String b : shiftBindingList) {
- String val = bindings.get(b).create(bs.getValue(b));
- sb.append(val).append("\u0000");
- if (order.length() == 0) {
- order = b;
- } else {
- order = order + "\u0000" + b;
- }
- }
-
- if (varOrder.size() < bindingslist.size()) {
- varOrder.add(order);
- }
-
- //System.out.println("String buffer is " + sb);
- Mutation m = new Mutation(sb.deleteCharAt(sb.length() - 1).toString());
- m.put(new Text(varOrder.get(j)), new Text(""), new org.apache.accumulo.core.data.Value(new byte[]{}));
- w.addMutation(m);
- }
- tableSize += 1;
+ Set<VariableOrder> orders = null;
+ try {
+ orders = pcj.getPcjMetadata(accCon, tablename).getVarOrders();
+ } catch (final PcjException e) {
+ e.printStackTrace();
+ }
+
+ varOrder = Lists.newArrayList();
+ for(final VariableOrder var: orders) {
+ varOrder.add(var.toString());
}
-
setLocalityGroups(tablename, accCon, varOrder);
- this.setSupportedVariableOrderMap(createSupportedVarOrderMap(varOrder));
-
-
- String orders = "";
-
- for(String s : varOrder) {
- s = s.replace("\u0000", ";");
- if(orders.length() == 0) {
- orders = s;
- } else {
- orders = orders + "\u0000" + s;
- }
- }
-
-
- Mutation m = new Mutation("~SPARQL");
- Value v = new Value(sparql.getBytes());
- m.put(new Text("" + tableSize), new Text(orders), v);
- w.addMutation(m);
-
- w.close();
- iter.close();
+ this.setSupportedVariableOrderMap(varOrder);
}
-
-
-
+ /**
+ *
+ * @param accCon - connection to a valid Accumulo instance
+ * @param tablename - name of an existing PCJ table
+ * @throws MalformedQueryException
+ * @throws SailException
+ * @throws QueryEvaluationException
+ * @throws MutationsRejectedException
+ * @throws TableNotFoundException
+ */
+ public AccumuloIndexSet(Connector accCon, String tablename)
+ throws MalformedQueryException, SailException,
+ QueryEvaluationException, MutationsRejectedException,
+ TableNotFoundException {
+ PcjMetadata meta = null;
+ try {
+ meta = pcj.getPcjMetadata(accCon, tablename);
+ } catch (final PcjException e) {
+ e.printStackTrace();
+ }
+
+ this.tablename = tablename;
+ this.accCon = accCon;
+ final SPARQLParser sp = new SPARQLParser();
+ final ParsedTupleQuery pq = (ParsedTupleQuery) sp.parseQuery(meta.getSparql(),
+ null);
+ setProjectionExpr((Projection) pq.getTupleExpr());
+ final Set<VariableOrder> orders = meta.getVarOrders();
+
+ varOrder = Lists.newArrayList();
+ for (final VariableOrder var : orders) {
+ varOrder.add(var.toString());
+ }
+ setLocalityGroups(tablename, accCon, varOrder);
+ this.setSupportedVariableOrderMap(varOrder);
+ }
+
+ /**
+ * returns size of table for query planning
+ */
@Override
- public Map<String, Set<String>> getSupportedVariableOrders() {
-
- return this.getSupportedVariableOrderMap();
-
+ public double cardinality() {
+ double cardinality = 0;
+ try {
+ cardinality = pcj.getPcjMetadata(accCon, tablename).getCardinality();
+ } catch (PcjException e) {
+ e.printStackTrace();
+ }
+ return cardinality;
}
-
-
- @Override
- public boolean supportsBindingSet(Set<String> bindingNames) {
-
- Map<String, Set<String>> varOrderMap = this.getSupportedVariableOrders();
- Collection<Set<String>> values = varOrderMap.values();
- Set<String> bNames = Sets.newHashSet();
- for (String s : this.getTupleExpr().getAssuredBindingNames()) {
- if (bindingNames.contains(s)) {
- bNames.add(s);
- }
- }
-
- return values.contains(bNames);
- }
-
-
- private String getVarOrder(Set<String> variables) {
- Map<String, Set<String>> varOrderMap = this.getSupportedVariableOrders();
+ /**
+ *
+ * @param tableName
+ * @param conn
+ * @param groups - locality groups to be created
+ *
+ * Sets locality groups for more efficient scans - these are usually the variable
+ * orders in the table so that scans for specific orders are more efficient
+ */
+ private void setLocalityGroups(String tableName, Connector conn, List<String> groups) {
- Set<Map.Entry<String, Set<String>>> entries = varOrderMap.entrySet();
+ final HashMap<String, Set<Text>> localityGroups = new HashMap<String, Set<Text>>();
+ for (int i = 0; i < groups.size(); i++) {
+ final HashSet<Text> tempColumn = new HashSet<Text>();
+ tempColumn.add(new Text(groups.get(i)));
+ final String groupName = groups.get(i).replace(VAR_ORDER_DELIM, "");
+ localityGroups.put(groupName, tempColumn);
+ }
- for (Map.Entry<String, Set<String>> e : entries) {
+ try {
+ conn.tableOperations().setLocalityGroups(tableName, localityGroups);
+ } catch (AccumuloException | AccumuloSecurityException
+ | TableNotFoundException e) {
+ e.printStackTrace();
+ }
- if (e.getValue().equals(variables)) {
- return e.getKey();
- }
+ }
- }
- return null;
+ @Override
+ public CloseableIteration<BindingSet,QueryEvaluationException> evaluate(BindingSet bindingset) throws QueryEvaluationException {
+ return this.evaluate(Collections.singleton(bindingset));
}
- private String prefixToOrder(String order) {
-
- Map<String, String> invMap = HashBiMap.create(this.getTableVarMap()).inverse();
- String[] temp = order.split("\u0000");
-
- for (int i = 0; i < temp.length; i++) {
- temp[i] = this.getTableVarMap().get(temp[i]);
+ /**
+ * Core evaluation method used during query evaluation - given a collection of binding set constraints, this
+ * method finds common binding labels between the constraints and table, uses those to build a prefix scan
+ * of the Accumulo table, and creates a solution binding set by iterating of the scan results.
+ */
+ @Override
+ public CloseableIteration<BindingSet,QueryEvaluationException> evaluate(final Collection<BindingSet> bindingset) throws QueryEvaluationException {
+ String localityGroup = "";
+ final Set<String> commonVars = Sets.newHashSet();
+ // if bindingset is empty, there are no results, so return empty iterator
+ if (bindingset.isEmpty()) {
+ return new IteratorWrapper<BindingSet, QueryEvaluationException>(new HashSet<BindingSet>().iterator());
}
-
- order = Joiner.on("\u0000").join(temp);
-
- for (String s : varOrder) {
- if (s.startsWith(order)) {
-
- temp = s.split("\u0000");
-
- for (int i = 0; i < temp.length; i++) {
- temp[i] = invMap.get(temp[i]);
+ //to build range prefix, find common vars of bindingset and PCJ bindings
+ else {
+ final BindingSet bs = bindingset.iterator().next();
+ for (final String b : this.getTupleExpr().getAssuredBindingNames()) {
+ final Binding v = bs.getBinding(b);
+ if (v != null) {
+ commonVars.add(b);
}
- return Joiner.on("\u0000").join(temp);
}
}
- throw new NoSuchElementException("Order is not a prefix of any locality group value!");
+ //add any constant constraints to common vars to be used in range prefix
+ commonVars.addAll(getConstantConstraints());
+ PcjQuery apq = null;
+ List<String> fullVarOrder = null;
+ String commonVarOrder = null;
+ try {
+ if (commonVars.size() > 0) {
+ commonVarOrder = getVarOrder(commonVars);
+ if(commonVarOrder == null) {
+ throw new IllegalStateException("Index does not support binding set!");
+ }
+ fullVarOrder = Lists.newArrayList(prefixToOrder(commonVarOrder).split(VAR_ORDER_DELIM));
+ //use varOrder and tableVarMap to set correct scan column
+ localityGroup = orderToLocGroup(fullVarOrder);
+ } else {
+ localityGroup = varOrder.get(0);
+ }
+ apq = new AccumuloPcjQuery(accCon, tablename);
+ final ValueMapVisitor vmv = new ValueMapVisitor();
+ this.getTupleExpr().visit(vmv);
+
+ List<String> commonVarOrderList = null;
+ if(commonVarOrder != null) {
+ commonVarOrderList = Lists.newArrayList(commonVarOrder.split(VAR_ORDER_DELIM));
+ } else {
+ commonVarOrderList = new ArrayList<>();
+ }
+
+ return apq.queryPrecompJoin(commonVarOrderList, localityGroup, vmv.getValMap(),
+ HashBiMap.create(this.getTableVarMap()).inverse(), bindingset);
+ } catch(final TableNotFoundException e) {
+ throw new QueryEvaluationException(e);
+ }
}
+ /**
+ *
+ * @param order - variable order as indicated by query
+ * @return - locality group or column family used in scan - this
+ * is just the variable order expressed in terms of the variables stored
+ * in the table
+ */
private String orderToLocGroup(List<String> order) {
String localityGroup = "";
- for (String s : order) {
+ for (final String s : order) {
if (localityGroup.length() == 0) {
localityGroup = this.getTableVarMap().get(s);
} else {
- localityGroup = localityGroup + "\u0000" + this.getTableVarMap().get(s);
+ localityGroup = localityGroup + VAR_ORDER_DELIM + this.getTableVarMap().get(s);
}
}
return localityGroup;
-
}
-
-
- private void setLocalityGroups(String tableName, Connector conn, List<String> groups) {
-
- HashMap<String, Set<Text>> localityGroups = new HashMap<String, Set<Text>>();
-
-
-
- for (int i = 0; i < groups.size(); i++) {
- HashSet<Text> tempColumn = new HashSet<Text>();
- tempColumn.add(new Text(groups.get(i)));
- String groupName = groups.get(i).replace("\u0000","");
- localityGroups.put(groupName, tempColumn);
- }
-
- try {
- conn.tableOperations().setLocalityGroups(tableName, localityGroups);
- } catch (AccumuloException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- } catch (AccumuloSecurityException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- } catch (TableNotFoundException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
-
-
-
- }
-
-
-
-
-
-
-
- private List<String> listShift(List<String> list, int j) {
-
- if(j >= list.size()) {
- throw new IllegalArgumentException();
- }
-
- List<String> shiftList = Lists.newArrayList();
- for(int i=0; i<list.size(); i++) {
- shiftList.add(list.get((i+j)%list.size()));
+ /**
+ *
+ * @param order - prefix of a full variable order
+ * @return - full variable order that includes all variables whose values
+ * are stored in the table - used to obtain the locality group
+ */
+ //given partial order of query vars, convert to PCJ vars and determine
+ //if converted partial order is a substring of a full var order of PCJ variables.
+ //if converted partial order is a prefix, convert corresponding full PCJ var order to query vars
+ private String prefixToOrder(String order) {
+ final Map<String, String> invMap = HashBiMap.create(this.getTableVarMap()).inverse();
+ String[] temp = order.split(VAR_ORDER_DELIM);
+ //get order in terms of PCJ variables
+ for (int i = 0; i < temp.length; i++) {
+ temp[i] = this.getTableVarMap().get(temp[i]);
}
-
- return shiftList;
- }
-
-
-
- private Set<String> getConstantConstraints() {
-
- Map<String, String> tableMap = this.getTableVarMap();
- Set<String> keys = tableMap.keySet();
- Set<String> constants = Sets.newHashSet();
-
- for (String s : keys) {
- if (s.startsWith("-const-")) {
- constants.add(s);
+ order = Joiner.on(VAR_ORDER_DELIM).join(temp);
+ for (final String s : varOrder) {
+ //verify that partial order is prefix of a PCJ varOrder
+ if (s.startsWith(order)) {
+ temp = s.split(VAR_ORDER_DELIM);
+ //convert full PCJ varOrder back to query varOrder
+ for (int i = 0; i < temp.length; i++) {
+ temp[i] = invMap.get(temp[i]);
+ }
+ return Joiner.on(VAR_ORDER_DELIM).join(temp);
}
-
}
-
- return constants;
-
- }
-
-
-
-
- public AccumuloIndexSet(String sparql, Connector accCon, String tablename) throws MalformedQueryException, SailException, QueryEvaluationException,
- MutationsRejectedException, TableNotFoundException {
- super(null);
- this.tablename = tablename;
- this.accCon = accCon;
- SPARQLParser sp = new SPARQLParser();
- ParsedTupleQuery pq = (ParsedTupleQuery) sp.parseQuery(sparql, null);
-
- setProjectionExpr((Projection) pq.getTupleExpr());
-
- this.bindingslist = Lists.newArrayList(pq.getTupleExpr().getAssuredBindingNames());
-
- this.bindings = Maps.newHashMap();
- pq.getTupleExpr().visit(new QueryModelVisitorBase<RuntimeException>() {
- @Override
- public void meet(Var node) {
- QueryModelNode parent = node.getParentNode();
- if (parent instanceof StatementPattern) {
- StatementPattern statement = (StatementPattern) parent;
- if (node.equals(statement.getSubjectVar())) {
- bindings.put(node.getName(), new AccUrlFactory());
- }
- if (node.equals(statement.getPredicateVar())) {
- bindings.put(node.getName(), new AccUrlFactory());
- }
- if (node.equals(statement.getObjectVar())) {
- bindings.put(node.getName(), new AccValueFactoryImpl());
- }
- if (node.equals(statement.getContextVar())) {
- // TODO is this correct?
- bindings.put(node.getName(), new AccUrlFactory());
- }
- } else if(parent instanceof ValueExpr) {
- bindings.put(node.getName(), new AccValueFactoryImpl());
- }
- };
- });
-
-
-
-
- Scanner s = accCon.createScanner(tablename, new Authorizations());
- s.setRange(Range.exact(new Text("~SPARQL")));
- Iterator<Entry<Key,Value>> i = s.iterator();
-
- String[] tempVarOrders = null;
-
- if (i.hasNext()) {
- Entry<Key, Value> entry = i.next();
- Text ts = entry.getKey().getColumnFamily();
- tempVarOrders = entry.getKey().getColumnQualifier().toString().split("\u0000");
- tableSize = Long.parseLong(ts.toString());
-
- } else {
- throw new IllegalStateException("Index table contains no metadata!");
- }
-
-
- varOrder = Lists.newArrayList();
-
- for(String t: tempVarOrders) {
- t = t.replace(";","\u0000");
- varOrder.add(t);
- }
-
- setLocalityGroups(tablename, accCon, varOrder);
- this.setSupportedVariableOrderMap(createSupportedVarOrderMap(varOrder));
-
+ throw new NoSuchElementException("Order is not a prefix of any locality group value!");
}
-
-
-
- private Map<String, Set<String>> createSupportedVarOrderMap(List<String> orders) {
-
- Map<String, Set<String>> supportedVars = Maps.newHashMap();
-
- for (String t : orders) {
-
- String[] tempOrder = t.split("\u0000");
- Set<String> varSet = Sets.newHashSet();
- String u = "";
-
- for (String s : tempOrder) {
- if(u.length() == 0) {
- u = s;
- } else{
- u = u+ "\u0000" + s;
- }
- varSet.add(s);
- supportedVars.put(u, new HashSet<String>(varSet));
-
+ /**
+ *
+ * @param variables
+ * @return - string representation of the Set variables, in an order that is in the
+ * table
+ */
+ private String getVarOrder(Set<String> variables) {
+ final Map<String, Set<String>> varOrderMap = this.getSupportedVariableOrders();
+ final Set<Map.Entry<String, Set<String>>> entries = varOrderMap.entrySet();
+ for (final Map.Entry<String, Set<String>> e : entries) {
+ if (e.getValue().equals(variables)) {
+ return e.getKey();
}
-
}
-
- return supportedVars;
- }
-
-
-
- @Override
- public void setProjectionExpr(Projection tupleExpr) {
- super.setProjectionExpr(tupleExpr);
- this.bindingslist = Lists.newArrayList(tupleExpr.getAssuredBindingNames());
-
- this.bindings = Maps.newHashMap();
- tupleExpr.visit(new QueryModelVisitorBase<RuntimeException>() {
- @Override
- public void meet(Var node) {
- QueryModelNode parent = node.getParentNode();
- if (parent instanceof StatementPattern) {
- StatementPattern statement = (StatementPattern) parent;
- if (node.equals(statement.getSubjectVar())) {
- bindings.put(node.getName(), new AccUrlFactory());
- }
- if (node.equals(statement.getPredicateVar())) {
- bindings.put(node.getName(), new AccUrlFactory());
- }
- if (node.equals(statement.getObjectVar())) {
- bindings.put(node.getName(), new AccValueFactoryImpl());
- }
- if (node.equals(statement.getContextVar())) {
- // TODO is this correct?
- bindings.put(node.getName(), new AccUrlFactory());
- }
- } else if (parent instanceof ValueExpr) { //Add bindings associated with Filters
- bindings.put(node.getName(), new AccValueFactoryImpl());
- }
- };
- });
-
- }
-
- @Override
- public String getSignature() {
- return "AccumuloIndexSet(" + tablename + ") : " + Joiner.on(", ").join(bindingslist);
- }
-
- @Override
- public CloseableIteration<BindingSet,QueryEvaluationException> evaluate(BindingSet bindingset) throws QueryEvaluationException {
- return this.evaluate(Collections.singleton(bindingset));
- }
-
- @Override
- public double cardinality() {
- return tableSize;
+ return null;
}
- @Override
- public CloseableIteration<BindingSet,QueryEvaluationException> evaluate(final Collection<BindingSet> bindingset) throws QueryEvaluationException {
-
- String localityGroup = "";
- Set<String> commonVars = Sets.newHashSet();
-
- if (!bindingset.isEmpty()) {
-
- BindingSet bs = bindingset.iterator().next();
- for (String b : bindingslist) {
- Binding v = bs.getBinding(b);
- if (v != null) {
- commonVars.add(b);
- }
-
- }
- }
-
- commonVars.addAll(getConstantConstraints());
- AccumuloPrecompQueryIndexer apq = null;
- List<String> fullVarOrder = null;
- try {
-
- if (commonVars.size() > 0) {
- String commonVarOrder = getVarOrder(commonVars);
- if(commonVarOrder == null) {
- throw new IllegalStateException("Index does not support binding set!");
- }
- fullVarOrder = Lists.newArrayList(prefixToOrder(commonVarOrder).split("\u0000"));
- localityGroup = orderToLocGroup(fullVarOrder);
- fullVarOrder.add("" + commonVars.size());
-
- } else {
- fullVarOrder = bindingslist;
- localityGroup = orderToLocGroup(fullVarOrder);
- fullVarOrder.add("" + 0);
+ /**
+ * @return - all constraints which correspond to variables
+ * in {@link AccumuloIndexSet#getTupleExpr()} which are set
+ * equal to a constant, but are non-constant in Accumulo table
+ */
+ private Set<String> getConstantConstraints() {
+ final Map<String, String> tableMap = this.getTableVarMap();
+ final Set<String> keys = tableMap.keySet();
+ final Set<String> constants = Sets.newHashSet();
+ for (final String s : keys) {
+ if (s.startsWith("-const-")) {
+ constants.add(s);
}
-
-
- apq = new AccumuloPrecompQueryIndexer(accCon, tablename);
- ValueMapVisitor vmv = new ValueMapVisitor();
- this.getTupleExpr().visit(vmv);
-
- return apq.queryPrecompJoin(fullVarOrder, localityGroup, this.bindings, vmv.getValMap(), bindingset);
-
- } catch(TableNotFoundException e) {
- throw new QueryEvaluationException(e);
- } finally {
- IOUtils.closeQuietly(apq);
}
+ return constants;
}
-
-
- public class ValueMapVisitor extends QueryModelVisitorBase<RuntimeException> {
+ /**
+ *
+ * Extracts the values associated with constant labels in the query
+ * Used to create binding sets from range scan
+ */
+ public class ValueMapVisitor extends QueryModelVisitorBase<RuntimeException> {
Map<String, org.openrdf.model.Value> valMap = Maps.newHashMap();
-
-
public Map<String, org.openrdf.model.Value> getValMap() {
return valMap;
}
-
@Override
public void meet(Var node) {
if (node.getName().startsWith("-const-")) {
valMap.put(node.getName(), node.getValue());
}
-
}
-
}
-
-
+
}
-
+
+