You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@rya.apache.org by mi...@apache.org on 2015/12/07 13:05:07 UTC
[37/51] [partial] incubator-rya git commit: Cannot delete temp branch,
doc'd it.
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/5a03ef61/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/entity/AccumuloDocIdIndexer.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/entity/AccumuloDocIdIndexer.java b/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/entity/AccumuloDocIdIndexer.java
deleted file mode 100644
index feb894f..0000000
--- a/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/entity/AccumuloDocIdIndexer.java
+++ /dev/null
@@ -1,450 +0,0 @@
-package mvm.rya.indexing.accumulo.entity;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-
-import static mvm.rya.api.RdfCloudTripleStoreConstants.DELIM_BYTE;
-import static mvm.rya.api.RdfCloudTripleStoreConstants.TYPE_DELIM_BYTE;
-import info.aduna.iteration.CloseableIteration;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map.Entry;
-import java.util.NoSuchElementException;
-import java.util.Set;
-
-import mvm.rya.accumulo.AccumuloRdfConfiguration;
-import mvm.rya.accumulo.documentIndex.DocIndexIteratorUtil;
-import mvm.rya.accumulo.documentIndex.DocumentIndexIntersectingIterator;
-import mvm.rya.api.RdfCloudTripleStoreConfiguration;
-import mvm.rya.api.domain.RyaURI;
-import mvm.rya.api.resolver.RyaContext;
-import mvm.rya.api.resolver.RyaToRdfConversions;
-import mvm.rya.api.resolver.RyaTypeResolverException;
-import mvm.rya.indexing.DocIdIndexer;
-import mvm.rya.indexing.accumulo.ConfigUtils;
-
-import org.apache.accumulo.core.client.AccumuloException;
-import org.apache.accumulo.core.client.AccumuloSecurityException;
-import org.apache.accumulo.core.client.BatchScanner;
-import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.IteratorSetting;
-import org.apache.accumulo.core.client.TableNotFoundException;
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.Range;
-import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.security.Authorizations;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.Text;
-import org.openrdf.query.BindingSet;
-import org.openrdf.query.MalformedQueryException;
-import org.openrdf.query.QueryEvaluationException;
-import org.openrdf.query.algebra.StatementPattern;
-import org.openrdf.query.algebra.TupleExpr;
-import org.openrdf.query.algebra.evaluation.QueryBindingSet;
-import org.openrdf.query.algebra.helpers.StatementPatternCollector;
-import org.openrdf.query.parser.ParsedQuery;
-import org.openrdf.query.parser.sparql.SPARQLParser;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.Sets;
-import com.google.common.primitives.Bytes;
-
-public class AccumuloDocIdIndexer implements DocIdIndexer {
-
-
-
- private BatchScanner bs;
- private AccumuloRdfConfiguration conf;
-
- public AccumuloDocIdIndexer(RdfCloudTripleStoreConfiguration conf) throws AccumuloException, AccumuloSecurityException {
- Preconditions.checkArgument(conf instanceof RdfCloudTripleStoreConfiguration, "conf must be isntance of RdfCloudTripleStoreConfiguration");
- this.conf = (AccumuloRdfConfiguration) conf;
- //Connector conn = ConfigUtils.getConnector(conf);
- }
-
-
-
-
- public CloseableIteration<BindingSet, QueryEvaluationException> queryDocIndex(String sparqlQuery,
- Collection<BindingSet> constraints) throws TableNotFoundException, QueryEvaluationException {
-
- SPARQLParser parser = new SPARQLParser();
- ParsedQuery pq1 = null;
- try {
- pq1 = parser.parseQuery(sparqlQuery, null);
- } catch (MalformedQueryException e) {
- e.printStackTrace();
- }
-
- TupleExpr te1 = pq1.getTupleExpr();
- List<StatementPattern> spList1 = StatementPatternCollector.process(te1);
-
- if(StarQuery.isValidStarQuery(spList1)) {
- StarQuery sq1 = new StarQuery(spList1);
- return queryDocIndex(sq1, constraints);
- } else {
- throw new IllegalArgumentException("Invalid star query!");
- }
-
- }
-
-
-
-
- @Override
- public CloseableIteration<BindingSet, QueryEvaluationException> queryDocIndex(StarQuery query,
- Collection<BindingSet> constraints) throws TableNotFoundException, QueryEvaluationException {
-
- final StarQuery starQ = query;
- final Iterator<BindingSet> bs = constraints.iterator();
- final Iterator<BindingSet> bs2 = constraints.iterator();
- final Set<String> unCommonVarNames;
- final Set<String> commonVarNames;
- if (bs2.hasNext()) {
- BindingSet currBs = bs2.next();
- commonVarNames = StarQuery.getCommonVars(query, currBs);
- unCommonVarNames = Sets.difference(currBs.getBindingNames(), commonVarNames);
- } else {
- commonVarNames = Sets.newHashSet();
- unCommonVarNames = Sets.newHashSet();
- }
-
- if( commonVarNames.size() == 1 && !query.commonVarConstant() && commonVarNames.contains(query.getCommonVarName())) {
-
- final HashMultimap<String, BindingSet> map = HashMultimap.create();
- final String commonVar = starQ.getCommonVarName();
- final Iterator<Entry<Key, Value>> intersections;
- final BatchScanner scan;
- Set<Range> ranges = Sets.newHashSet();
-
- while(bs.hasNext()) {
-
- BindingSet currentBs = bs.next();
-
- if(currentBs.getBinding(commonVar) == null) {
- continue;
- }
-
- String row = currentBs.getBinding(commonVar).getValue().stringValue();
- ranges.add(new Range(row));
- map.put(row, currentBs);
-
- }
- scan = runQuery(starQ, ranges);
- intersections = scan.iterator();
-
-
- return new CloseableIteration<BindingSet, QueryEvaluationException>() {
-
-
- private QueryBindingSet currentSolutionBs = null;
- private boolean hasNextCalled = false;
- private boolean isEmpty = false;
- private Iterator<BindingSet> inputSet = (new ArrayList<BindingSet>()).iterator();
- private BindingSet currentBs;
- private Key key;
-
-
-
- @Override
- public boolean hasNext() throws QueryEvaluationException {
- if (!hasNextCalled && !isEmpty) {
- while (inputSet.hasNext() || intersections.hasNext()) {
- if (!inputSet.hasNext()) {
- key = intersections.next().getKey();
- inputSet = map.get(key.getRow().toString()).iterator();
- }
- currentBs = inputSet.next();
- currentSolutionBs = deserializeKey(key, starQ, currentBs, unCommonVarNames);
-
- if (currentSolutionBs.size() == unCommonVarNames.size() + starQ.getUnCommonVars().size() +1) {
- hasNextCalled = true;
- return true;
- }
-
- }
-
- isEmpty = true;
- return false;
-
- } else if (isEmpty) {
- return false;
- } else {
- return true;
- }
-
- }
-
-
- @Override
- public BindingSet next() throws QueryEvaluationException {
-
- if (hasNextCalled) {
- hasNextCalled = false;
- } else if (isEmpty) {
- throw new NoSuchElementException();
- } else {
- if (this.hasNext()) {
- hasNextCalled = false;
- } else {
- throw new NoSuchElementException();
- }
- }
-
- return currentSolutionBs;
- }
-
- @Override
- public void remove() throws QueryEvaluationException {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void close() throws QueryEvaluationException {
- scan.close();
- }
-
- };
-
-
- } else {
-
- return new CloseableIteration<BindingSet, QueryEvaluationException>() {
-
- @Override
- public void remove() throws QueryEvaluationException {
- throw new UnsupportedOperationException();
- }
-
- private Iterator<Entry<Key, Value>> intersections = null;
- private QueryBindingSet currentSolutionBs = null;
- private boolean hasNextCalled = false;
- private boolean isEmpty = false;
- private boolean init = false;
- private BindingSet currentBs;
- private StarQuery sq = new StarQuery(starQ);
- private Set<Range> emptyRangeSet = Sets.newHashSet();
- private BatchScanner scan;
-
- @Override
- public BindingSet next() throws QueryEvaluationException {
- if (hasNextCalled) {
- hasNextCalled = false;
- } else if (isEmpty) {
- throw new NoSuchElementException();
- } else {
- if (this.hasNext()) {
- hasNextCalled = false;
- } else {
- throw new NoSuchElementException();
- }
- }
- return currentSolutionBs;
- }
-
- @Override
- public boolean hasNext() throws QueryEvaluationException {
-
- if (!init) {
- if (intersections == null && bs.hasNext()) {
- currentBs = bs.next();
- sq = StarQuery.getConstrainedStarQuery(sq, currentBs);
- scan = runQuery(sq,emptyRangeSet);
- intersections = scan.iterator();
- // binding set empty
- } else if (intersections == null && !bs.hasNext()) {
- currentBs = new QueryBindingSet();
- scan = runQuery(starQ,emptyRangeSet);
- intersections = scan.iterator();
- }
-
- init = true;
- }
-
- if (!hasNextCalled && !isEmpty) {
- while (intersections.hasNext() || bs.hasNext()) {
- if (!intersections.hasNext()) {
- scan.close();
- currentBs = bs.next();
- sq = StarQuery.getConstrainedStarQuery(sq, currentBs);
- scan = runQuery(sq,emptyRangeSet);
- intersections = scan.iterator();
- }
- if (intersections.hasNext()) {
- currentSolutionBs = deserializeKey(intersections.next().getKey(), sq, currentBs,
- unCommonVarNames);
- } else {
- continue;
- }
-
- if (sq.commonVarConstant() && currentSolutionBs.size() == unCommonVarNames.size() + sq.getUnCommonVars().size()) {
- hasNextCalled = true;
- return true;
- } else if(currentSolutionBs.size() == unCommonVarNames.size() + sq.getUnCommonVars().size() + 1) {
- hasNextCalled = true;
- return true;
- }
- }
-
- isEmpty = true;
- return false;
-
- } else if (isEmpty) {
- return false;
- } else {
- return true;
- }
- }
-
- @Override
- public void close() throws QueryEvaluationException {
- scan.close();
- }
- };
- }
- }
-
- private QueryBindingSet deserializeKey(Key key, StarQuery sq, BindingSet currentBs, Set<String> unCommonVar) {
-
-
- QueryBindingSet currentSolutionBs = new QueryBindingSet();
-
- Text row = key.getRow();
- Text cq = key.getColumnQualifier();
-
-
- String[] cqArray = cq.toString().split(DocIndexIteratorUtil.DOC_ID_INDEX_DELIM);
-
- boolean commonVarSet = false;
-
- //if common Var is constant there is no common variable to assign a value to
- if(sq.commonVarConstant()) {
- commonVarSet = true;
- }
-
- if (!commonVarSet && sq.isCommonVarURI()) {
- RyaURI rURI = new RyaURI(row.toString());
- currentSolutionBs.addBinding(sq.getCommonVarName(),
- RyaToRdfConversions.convertValue(rURI));
- commonVarSet = true;
- }
-
- for (String s : sq.getUnCommonVars()) {
-
- byte[] cqBytes = cqArray[sq.getVarPos().get(s)].getBytes();
- int firstIndex = Bytes.indexOf(cqBytes, DELIM_BYTE);
- int secondIndex = Bytes.lastIndexOf(cqBytes, DELIM_BYTE);
- int typeIndex = Bytes.indexOf(cqBytes, TYPE_DELIM_BYTE);
- byte[] tripleComponent = Arrays.copyOfRange(cqBytes, firstIndex + 1, secondIndex);
- byte[] cqContent = Arrays.copyOfRange(cqBytes, secondIndex + 1, typeIndex);
- byte[] objType = Arrays.copyOfRange(cqBytes, typeIndex, cqBytes.length);
-
- if ((new String(tripleComponent)).equals("object")) {
- byte[] object = Bytes.concat(cqContent, objType);
- org.openrdf.model.Value v = null;
- try {
- v = RyaToRdfConversions.convertValue(RyaContext.getInstance().deserialize(
- object));
- } catch (RyaTypeResolverException e) {
- e.printStackTrace();
- }
- currentSolutionBs.addBinding(s, v);
-
- } else if ((new String(tripleComponent)).equals("subject")) {
- if (!commonVarSet) {
- byte[] object = Bytes.concat(row.getBytes(), objType);
- org.openrdf.model.Value v = null;
- try {
- v = RyaToRdfConversions.convertValue(RyaContext.getInstance().deserialize(
- object));
- } catch (RyaTypeResolverException e) {
- e.printStackTrace();
- }
- currentSolutionBs.addBinding(sq.getCommonVarName(), v);
- commonVarSet = true;
- }
- RyaURI rURI = new RyaURI(new String(cqContent));
- currentSolutionBs.addBinding(s, RyaToRdfConversions.convertValue(rURI));
- } else {
- throw new IllegalArgumentException("Invalid row.");
- }
- }
- for (String s : unCommonVar) {
- currentSolutionBs.addBinding(s, currentBs.getValue(s));
- }
- return currentSolutionBs;
- }
-
- private BatchScanner runQuery(StarQuery query, Collection<Range> ranges) throws QueryEvaluationException {
-
- try {
- if (ranges.size() == 0) {
- String rangeText = query.getCommonVarValue();
- Range r;
- if (rangeText != null) {
- r = new Range(new Text(query.getCommonVarValue()));
- } else {
- r = new Range();
- }
- ranges = Collections.singleton(r);
- }
-
- Connector accCon = ConfigUtils.getConnector(conf);
- IteratorSetting is = new IteratorSetting(30, "fii", DocumentIndexIntersectingIterator.class);
-
- DocumentIndexIntersectingIterator.setColumnFamilies(is, query.getColumnCond());
-
- if(query.hasContext()) {
- DocumentIndexIntersectingIterator.setContext(is, query.getContextURI());
- }
- bs = accCon.createBatchScanner(ConfigUtils.getEntityTableName(conf),
- new Authorizations(conf.get(ConfigUtils.CLOUDBASE_AUTHS)), 15);
- bs.addScanIterator(is);
- bs.setRanges(ranges);
-
- return bs;
-
- } catch (TableNotFoundException e) {
- e.printStackTrace();
- } catch (AccumuloException e) {
- e.printStackTrace();
- } catch (AccumuloSecurityException e) {
- e.printStackTrace();
- }
- throw new QueryEvaluationException();
- }
-
-
- @Override
- public void close() throws IOException {
- //TODO generate an exception when BS passed in -- scanner closed
-// if (bs != null) {
-// bs.close();
-// }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/5a03ef61/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/entity/EntityCentricIndex.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/entity/EntityCentricIndex.java b/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/entity/EntityCentricIndex.java
deleted file mode 100644
index b8b3f65..0000000
--- a/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/entity/EntityCentricIndex.java
+++ /dev/null
@@ -1,252 +0,0 @@
-package mvm.rya.indexing.accumulo.entity;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-
-import static mvm.rya.accumulo.AccumuloRdfConstants.EMPTY_CV;
-import static mvm.rya.accumulo.AccumuloRdfConstants.EMPTY_VALUE;
-import static mvm.rya.api.RdfCloudTripleStoreConstants.DELIM_BYTES;
-import static mvm.rya.api.RdfCloudTripleStoreConstants.EMPTY_BYTES;
-import static mvm.rya.api.RdfCloudTripleStoreConstants.EMPTY_TEXT;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.List;
-
-import mvm.rya.accumulo.AccumuloRdfConfiguration;
-import mvm.rya.accumulo.experimental.AbstractAccumuloIndexer;
-import mvm.rya.accumulo.experimental.AccumuloIndexer;
-import mvm.rya.api.domain.RyaStatement;
-import mvm.rya.api.domain.RyaType;
-import mvm.rya.api.domain.RyaURI;
-import mvm.rya.api.resolver.RdfToRyaConversions;
-import mvm.rya.api.resolver.RyaContext;
-import mvm.rya.api.resolver.RyaTypeResolverException;
-import mvm.rya.api.resolver.triple.TripleRow;
-import mvm.rya.indexing.accumulo.ConfigUtils;
-import mvm.rya.indexing.accumulo.freetext.AccumuloFreeTextIndexer;
-
-import org.apache.accumulo.core.client.AccumuloException;
-import org.apache.accumulo.core.client.AccumuloSecurityException;
-import org.apache.accumulo.core.client.BatchWriter;
-import org.apache.accumulo.core.client.MultiTableBatchWriter;
-import org.apache.accumulo.core.client.MutationsRejectedException;
-import org.apache.accumulo.core.client.TableExistsException;
-import org.apache.accumulo.core.client.TableNotFoundException;
-import org.apache.accumulo.core.data.Mutation;
-import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.security.ColumnVisibility;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.Text;
-import org.apache.log4j.Logger;
-import org.openrdf.model.Statement;
-import org.openrdf.query.algebra.evaluation.QueryOptimizer;
-import org.openrdf.query.algebra.evaluation.impl.BindingAssigner;
-import org.openrdf.query.algebra.evaluation.impl.CompareOptimizer;
-import org.openrdf.query.algebra.evaluation.impl.ConjunctiveConstraintSplitter;
-import org.openrdf.query.algebra.evaluation.impl.ConstantOptimizer;
-import org.openrdf.query.algebra.evaluation.impl.DisjunctiveConstraintOptimizer;
-import org.openrdf.query.algebra.evaluation.impl.FilterOptimizer;
-import org.openrdf.query.algebra.evaluation.impl.IterativeEvaluationOptimizer;
-import org.openrdf.query.algebra.evaluation.impl.OrderLimitOptimizer;
-import org.openrdf.query.algebra.evaluation.impl.QueryModelNormalizer;
-import org.openrdf.query.algebra.evaluation.impl.SameTermFilterOptimizer;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.common.primitives.Bytes;
-
-public class EntityCentricIndex extends AbstractAccumuloIndexer {
-
- private static final Logger logger = Logger.getLogger(EntityCentricIndex.class);
- private static final String TABLE_SUFFIX = "EntityCentricIndex";
-
- private AccumuloRdfConfiguration conf;
- private BatchWriter writer;
- private boolean isInit = false;
-
- public static final String CONF_TABLE_SUFFIX = "ac.indexer.eci.tablename";
-
-
- private void init() throws AccumuloException, AccumuloSecurityException, TableNotFoundException, IOException,
- TableExistsException {
- ConfigUtils.createTableIfNotExists(conf, ConfigUtils.getEntityTableName(conf));
- }
-
-
- @Override
- public Configuration getConf() {
- return this.conf;
- }
-
- //initialization occurs in setConf because index is created using reflection
- @Override
- public void setConf(Configuration conf) {
- if (conf instanceof AccumuloRdfConfiguration) {
- this.conf = (AccumuloRdfConfiguration) conf;
- } else {
- this.conf = new AccumuloRdfConfiguration(conf);
- }
- if (!isInit) {
- try {
- init();
- isInit = true;
- } catch (AccumuloException e) {
- logger.warn("Unable to initialize index. Throwing Runtime Exception. ", e);
- throw new RuntimeException(e);
- } catch (AccumuloSecurityException e) {
- logger.warn("Unable to initialize index. Throwing Runtime Exception. ", e);
- throw new RuntimeException(e);
- } catch (TableNotFoundException e) {
- logger.warn("Unable to initialize index. Throwing Runtime Exception. ", e);
- throw new RuntimeException(e);
- } catch (TableExistsException e) {
- logger.warn("Unable to initialize index. Throwing Runtime Exception. ", e);
- throw new RuntimeException(e);
- } catch (IOException e) {
- logger.warn("Unable to initialize index. Throwing Runtime Exception. ", e);
- throw new RuntimeException(e);
- }
- }
- }
-
-
- @Override
- public String getTableName() {
- return ConfigUtils.getEntityTableName(conf);
- }
-
- @Override
- public void setMultiTableBatchWriter(MultiTableBatchWriter writer) throws IOException {
- try {
- this.writer = writer.getBatchWriter(getTableName());
- } catch (AccumuloException e) {
- throw new IOException(e);
- } catch (AccumuloSecurityException e) {
- throw new IOException(e);
- } catch (TableNotFoundException e) {
- throw new IOException(e);
- }
-
- }
-
-
- public void storeStatement(RyaStatement stmt) throws IOException {
- Preconditions.checkNotNull(writer, "BatchWriter not Set");
- try {
- for (TripleRow row : serializeStatement(stmt)) {
- writer.addMutation(createMutation(row));
- }
- } catch (MutationsRejectedException e) {
- throw new IOException(e);
- } catch (RyaTypeResolverException e) {
- throw new IOException(e);
- }
- }
-
-
- public void deleteStatement(RyaStatement stmt) throws IOException {
- Preconditions.checkNotNull(writer, "BatchWriter not Set");
- try {
- for (TripleRow row : serializeStatement(stmt)) {
- writer.addMutation(deleteMutation(row));
- }
- } catch (MutationsRejectedException e) {
- throw new IOException(e);
- } catch (RyaTypeResolverException e) {
- throw new IOException(e);
- }
- }
-
-
- protected Mutation deleteMutation(TripleRow tripleRow) {
- Mutation m = new Mutation(new Text(tripleRow.getRow()));
-
- byte[] columnFamily = tripleRow.getColumnFamily();
- Text cfText = columnFamily == null ? EMPTY_TEXT : new Text(columnFamily);
-
- byte[] columnQualifier = tripleRow.getColumnQualifier();
- Text cqText = columnQualifier == null ? EMPTY_TEXT : new Text(columnQualifier);
-
- m.putDelete(cfText, cqText, new ColumnVisibility(tripleRow.getColumnVisibility()), tripleRow.getTimestamp());
- return m;
- }
-
- public static Collection<Mutation> createMutations(RyaStatement stmt) throws RyaTypeResolverException{
- Collection<Mutation> m = Lists.newArrayList();
- for (TripleRow tr : serializeStatement(stmt)){
- m.add(createMutation(tr));
- }
- return m;
- }
-
- private static Mutation createMutation(TripleRow tripleRow) {
- Mutation mutation = new Mutation(new Text(tripleRow.getRow()));
- byte[] columnVisibility = tripleRow.getColumnVisibility();
- ColumnVisibility cv = columnVisibility == null ? EMPTY_CV : new ColumnVisibility(columnVisibility);
- Long timestamp = tripleRow.getTimestamp();
- byte[] value = tripleRow.getValue();
- Value v = value == null ? EMPTY_VALUE : new Value(value);
- byte[] columnQualifier = tripleRow.getColumnQualifier();
- Text cqText = columnQualifier == null ? EMPTY_TEXT : new Text(columnQualifier);
- byte[] columnFamily = tripleRow.getColumnFamily();
- Text cfText = columnFamily == null ? EMPTY_TEXT : new Text(columnFamily);
-
- mutation.put(cfText, cqText, cv, timestamp, v);
- return mutation;
- }
-
- private static List<TripleRow> serializeStatement(RyaStatement stmt) throws RyaTypeResolverException {
- RyaURI subject = stmt.getSubject();
- RyaURI predicate = stmt.getPredicate();
- RyaType object = stmt.getObject();
- RyaURI context = stmt.getContext();
- Long timestamp = stmt.getTimestamp();
- byte[] columnVisibility = stmt.getColumnVisibility();
- byte[] value = stmt.getValue();
- assert subject != null && predicate != null && object != null;
- byte[] cf = (context == null) ? EMPTY_BYTES : context.getData().getBytes();
- byte[] subjBytes = subject.getData().getBytes();
- byte[] predBytes = predicate.getData().getBytes();
- byte[][] objBytes = RyaContext.getInstance().serializeType(object);
-
- return Lists.newArrayList(new TripleRow(subjBytes, //
- predBytes, //
- Bytes.concat(cf, DELIM_BYTES, //
- "object".getBytes(), DELIM_BYTES, //
- objBytes[0], objBytes[1]), //
- timestamp, //
- columnVisibility, //
- value//
- ),
-
- new TripleRow(objBytes[0], //
- predBytes, //
- Bytes.concat(cf, DELIM_BYTES, //
- "subject".getBytes(), DELIM_BYTES, //
- subjBytes, objBytes[1]), //
- timestamp, //
- columnVisibility, //
- value//
- ));
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/5a03ef61/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/entity/EntityLocalityGroupSetter.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/entity/EntityLocalityGroupSetter.java b/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/entity/EntityLocalityGroupSetter.java
deleted file mode 100644
index 2030e58..0000000
--- a/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/entity/EntityLocalityGroupSetter.java
+++ /dev/null
@@ -1,171 +0,0 @@
-package mvm.rya.indexing.accumulo.entity;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.Map.Entry;
-import java.util.NoSuchElementException;
-import java.util.Set;
-import mvm.rya.indexing.accumulo.ConfigUtils;
-import org.apache.accumulo.core.client.AccumuloException;
-import org.apache.accumulo.core.client.AccumuloSecurityException;
-import org.apache.accumulo.core.client.BatchScanner;
-import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.TableNotFoundException;
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.Range;
-import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.security.Authorizations;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.Text;
-
-public class EntityLocalityGroupSetter {
-
-
- String tablePrefix;
- Connector conn;
- Configuration conf;
-
- public EntityLocalityGroupSetter(String tablePrefix, Connector conn, Configuration conf) {
- this.conn = conn;
- this.tablePrefix = tablePrefix;
- this.conf = conf;
- }
-
-
-
- private Iterator<String> getPredicates() {
-
- String auths = conf.get(ConfigUtils.CLOUDBASE_AUTHS);
- BatchScanner bs = null;
- try {
- bs = conn.createBatchScanner(tablePrefix + "prospects", new Authorizations(auths), 10);
- } catch (TableNotFoundException e) {
- e.printStackTrace();
- }
- bs.setRanges(Collections.singleton(Range.prefix(new Text("predicate" + "\u0000"))));
- final Iterator<Entry<Key,Value>> iter = bs.iterator();
-
- return new Iterator<String>() {
-
- private String next = null;
- private boolean hasNextCalled = false;
- private boolean isEmpty = false;
-
- @Override
- public boolean hasNext() {
-
- if (!hasNextCalled && !isEmpty) {
- while (iter.hasNext()) {
- Entry<Key,Value> temp = iter.next();
- String row = temp.getKey().getRow().toString();
- String[] rowArray = row.split("\u0000");
- next = rowArray[1];
-
- hasNextCalled = true;
- return true;
- }
- isEmpty = true;
- return false;
- } else if(isEmpty) {
- return false;
- }else {
- return true;
- }
- }
-
- @Override
- public String next() {
-
- if (hasNextCalled) {
- hasNextCalled = false;
- return next;
- } else if(isEmpty) {
- throw new NoSuchElementException();
- }else {
- if (this.hasNext()) {
- hasNextCalled = false;
- return next;
- } else {
- throw new NoSuchElementException();
- }
- }
- }
-
- @Override
- public void remove() {
-
- throw new UnsupportedOperationException("Cannot delete from iterator!");
-
- }
-
- };
- }
-
-
-
-
-
-
-
-
- public void setLocalityGroups() {
-
- HashMap<String, Set<Text>> localityGroups = new HashMap<String, Set<Text>>();
- Iterator<String> groups = getPredicates();
-
- int i = 1;
-
- while(groups.hasNext()) {
- HashSet<Text> tempColumn = new HashSet<Text>();
- String temp = groups.next();
- tempColumn.add(new Text(temp));
- String groupName = "predicate" + i;
- localityGroups.put(groupName, tempColumn);
- i++;
- }
-
-
- try {
- conn.tableOperations().setLocalityGroups(tablePrefix + "doc_partitioned_index", localityGroups);
- //conn.tableOperations().compact(tablePrefix + "doc_partitioned_index", null, null, true, true);
- } catch (AccumuloException e) {
- e.printStackTrace();
- } catch (AccumuloSecurityException e) {
- e.printStackTrace();
- } catch (TableNotFoundException e) {
- e.printStackTrace();
- }
-
-
-
- }
-
-
-
-
-
-
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/5a03ef61/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/entity/EntityOptimizer.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/entity/EntityOptimizer.java b/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/entity/EntityOptimizer.java
deleted file mode 100644
index e46c321..0000000
--- a/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/entity/EntityOptimizer.java
+++ /dev/null
@@ -1,436 +0,0 @@
-package mvm.rya.indexing.accumulo.entity;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.Set;
-
-import mvm.rya.accumulo.AccumuloRdfConfiguration;
-import mvm.rya.api.RdfCloudTripleStoreConfiguration;
-import mvm.rya.api.persist.RdfEvalStatsDAO;
-import mvm.rya.api.persist.joinselect.SelectivityEvalDAO;
-import mvm.rya.indexing.accumulo.ConfigUtils;
-import mvm.rya.joinselect.AccumuloSelectivityEvalDAO;
-import mvm.rya.prospector.service.ProspectorServiceEvalStatsDAO;
-import mvm.rya.rdftriplestore.inference.DoNotExpandSP;
-import mvm.rya.rdftriplestore.utils.FixedStatementPattern;
-
-import org.apache.accumulo.core.client.AccumuloException;
-import org.apache.accumulo.core.client.AccumuloSecurityException;
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.conf.Configuration;
-import org.openrdf.query.BindingSet;
-import org.openrdf.query.Dataset;
-import org.openrdf.query.algebra.Filter;
-import org.openrdf.query.algebra.Join;
-import org.openrdf.query.algebra.QueryModelNode;
-import org.openrdf.query.algebra.StatementPattern;
-import org.openrdf.query.algebra.TupleExpr;
-import org.openrdf.query.algebra.Var;
-import org.openrdf.query.algebra.evaluation.QueryOptimizer;
-import org.openrdf.query.algebra.helpers.QueryModelVisitorBase;
-
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-
-public class EntityOptimizer implements QueryOptimizer, Configurable {
-
- private SelectivityEvalDAO<RdfCloudTripleStoreConfiguration> eval;
- private RdfCloudTripleStoreConfiguration conf;
- private boolean isEvalDaoSet = false;
-
-
- public EntityOptimizer() {
-
- }
-
- public EntityOptimizer(RdfCloudTripleStoreConfiguration conf) {
- if(conf.isUseStats() && conf.isUseSelectivity()) {
- try {
- eval = new AccumuloSelectivityEvalDAO(conf, ConfigUtils.getConnector(conf));
- ((AccumuloSelectivityEvalDAO)eval).setRdfEvalDAO(new ProspectorServiceEvalStatsDAO(ConfigUtils.getConnector(conf), conf));
- eval.init();
- } catch (AccumuloException e) {
- e.printStackTrace();
- } catch (AccumuloSecurityException e) {
- e.printStackTrace();
- }
-
- isEvalDaoSet = true;
- } else {
- eval = null;
- isEvalDaoSet = true;
- }
- this.conf = conf;
- }
-
- public EntityOptimizer(SelectivityEvalDAO<RdfCloudTripleStoreConfiguration> eval) {
- this.eval = eval;
- this.conf = eval.getConf();
- isEvalDaoSet = true;
- }
-
- @Override
- public void setConf(Configuration conf) {
- if(conf instanceof RdfCloudTripleStoreConfiguration) {
- this.conf = (RdfCloudTripleStoreConfiguration) conf;
- } else {
- this.conf = new AccumuloRdfConfiguration(conf);
- }
-
- if (!isEvalDaoSet) {
- if(this.conf.isUseStats() && this.conf.isUseSelectivity()) {
- try {
- eval = new AccumuloSelectivityEvalDAO(this.conf, ConfigUtils.getConnector(this.conf));
- ((AccumuloSelectivityEvalDAO)eval).setRdfEvalDAO(new ProspectorServiceEvalStatsDAO(ConfigUtils.getConnector(this.conf), this.conf));
- eval.init();
- } catch (AccumuloException e) {
- e.printStackTrace();
- } catch (AccumuloSecurityException e) {
- e.printStackTrace();
- }
-
- isEvalDaoSet = true;
- } else {
- eval = null;
- isEvalDaoSet = true;
- }
- }
-
- }
-
- @Override
- public Configuration getConf() {
- return conf;
- }
-
- /**
- * Applies generally applicable optimizations: path expressions are sorted
- * from more to less specific.
- *
- * @param tupleExpr
- */
- @Override
- public void optimize(TupleExpr tupleExpr, Dataset dataset, BindingSet bindings) {
- tupleExpr.visit(new JoinVisitor());
- }
-
- protected class JoinVisitor extends QueryModelVisitorBase<RuntimeException> {
-
- @Override
- public void meet(Join node) {
- try {
- if (node.getLeftArg() instanceof FixedStatementPattern && node.getRightArg() instanceof DoNotExpandSP) {
- return;
- }
- List<TupleExpr> joinArgs = getJoinArgs(node, new ArrayList<TupleExpr>());
- HashMultimap<String, StatementPattern> varMap = getVarBins(joinArgs);
- while (!varMap.keySet().isEmpty()) {
- String s = getHighestPriorityKey(varMap);
- constructTuple(varMap, joinArgs, s);
- }
- List<TupleExpr> filterChain = getFilterChain(joinArgs);
-
- for (TupleExpr te : joinArgs) {
- if (!(te instanceof StatementPattern) || !(te instanceof EntityTupleSet)) {
- te.visit(this);
- }
- }
- // Replace old join hierarchy
- node.replaceWith(getNewJoin(joinArgs, filterChain));
-
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
-
- private List<TupleExpr> getFilterChain(List<TupleExpr> joinArgs) {
- List<TupleExpr> filterTopBottom = Lists.newArrayList();
- TupleExpr filterChainTop = null;
- TupleExpr filterChainBottom = null;
-
- for(int i = 0; i < joinArgs.size(); i++) {
- if(joinArgs.get(i) instanceof Filter) {
- if(filterChainTop == null) {
- filterChainTop = joinArgs.remove(i);
- i--;
- } else if(filterChainBottom == null){
- filterChainBottom = joinArgs.remove(i);
- ((Filter)filterChainTop).setArg(filterChainBottom);
- i--;
- } else {
- ((Filter)filterChainBottom).setArg(joinArgs.remove(i));
- filterChainBottom = ((Filter)filterChainBottom).getArg();
- i--;
- }
- }
- }
- if(filterChainTop != null) {
- filterTopBottom.add(filterChainTop);
- }
- if(filterChainBottom != null) {
- filterTopBottom.add(filterChainBottom);
- }
- return filterTopBottom;
- }
-
- private TupleExpr getNewJoin(List<TupleExpr> joinArgs, List<TupleExpr> filterChain) {
- TupleExpr newJoin;
-
- if (joinArgs.size() > 1) {
- if (filterChain.size() > 0) {
- TupleExpr finalJoinArg = joinArgs.remove(0);
- TupleExpr tempJoin;
- TupleExpr temp = filterChain.get(0);
-
- if (joinArgs.size() > 1) {
- tempJoin = new Join(joinArgs.remove(0), joinArgs.remove(0));
- for (TupleExpr te : joinArgs) {
- tempJoin = new Join(tempJoin, te);
- }
- } else {
- tempJoin = joinArgs.remove(0);
- }
-
- if (filterChain.size() == 1) {
- ((Filter) temp).setArg(tempJoin);
- } else {
- ((Filter) filterChain.get(1)).setArg(tempJoin);
- }
- newJoin = new Join(temp, finalJoinArg);
- } else {
- newJoin = new Join(joinArgs.get(0), joinArgs.get(1));
- joinArgs.remove(0);
- joinArgs.remove(0);
-
- for (TupleExpr te : joinArgs) {
- newJoin = new Join(newJoin, te);
- }
- }
- } else if (joinArgs.size() == 1) {
- if (filterChain.size() > 0) {
- newJoin = filterChain.get(0);
- if (filterChain.size() == 1) {
- ((Filter) newJoin).setArg(joinArgs.get(0));
- } else {
- ((Filter) filterChain.get(1)).setArg(joinArgs.get(0));
- }
- } else {
- newJoin = joinArgs.get(0);
- }
- } else {
- throw new IllegalStateException("JoinArgs size cannot be zero.");
- }
- return newJoin;
- }
-
- private HashMultimap<String, StatementPattern> getVarBins(List<TupleExpr> nodes) {
-
- HashMultimap<String, StatementPattern> varMap = HashMultimap.create();
-
- for (QueryModelNode node : nodes) {
- if (node instanceof StatementPattern) {
- StatementPattern sp = (StatementPattern) node;
- if (sp.getPredicateVar().isConstant()) {
- varMap.put(sp.getSubjectVar().getName(), sp);
- varMap.put(sp.getObjectVar().getName(), sp);
- }
- }
- }
-
- removeInvalidBins(varMap, true);
-
- return varMap;
- }
-
- private void updateVarMap(HashMultimap<String, StatementPattern> varMap, Set<StatementPattern> bin) {
-
- for (StatementPattern sp : bin) {
- varMap.remove(sp.getSubjectVar().getName(), sp);
- varMap.remove(sp.getObjectVar().getName(), sp);
- }
-
- removeInvalidBins(varMap, false);
-
- }
-
- private void removeInvalidBins(HashMultimap<String, StatementPattern> varMap, boolean newMap) {
-
- Set<String> keys = Sets.newHashSet(varMap.keySet());
-
- if (newMap) {
- for (String s : keys) {
- Set<StatementPattern> spSet = Sets.newHashSet(varMap.get(s));
- if (!StarQuery.isValidStarQuery(spSet)) {
- for (StatementPattern sp : spSet) {
- varMap.remove(s, sp);
- }
- }
-
- }
- } else {
-
- for (String s : keys) {
- Set<StatementPattern> spSet = Sets.newHashSet(varMap.get(s));
- if (spSet.size() == 1) {
- for (StatementPattern sp : spSet) {
- varMap.remove(s, sp);
- }
- }
-
- }
- }
-
- }
-
- private void constructTuple(HashMultimap<String, StatementPattern> varMap, List<TupleExpr> joinArgs,
- String binName) {
-
- Set<StatementPattern> bin = Sets.newHashSet(varMap.get(binName));
- StarQuery sq = new StarQuery(bin);
-
- updateVarMap(varMap, bin);
- for (StatementPattern sp : bin) {
- joinArgs.remove(sp);
- }
-
- joinArgs.add(new EntityTupleSet(sq, conf));
-
- }
-
- private String getHighestPriorityKey(HashMultimap<String, StatementPattern> varMap) {
-
- double tempPriority = -1;
- double priority = -Double.MAX_VALUE;
- String priorityKey = "";
- Set<StatementPattern> bin = null;
-
- Set<String> keys = varMap.keySet();
-
- for (String s : keys) {
- bin = varMap.get(s);
- tempPriority = bin.size();
- tempPriority *= getCardinality(bin);
- tempPriority *= getMinCardSp(bin);
-
- // weight starQuery where common Var is constant slightly more -- this factor is subject
- // to change
- if(s.startsWith("-const-")) {
- tempPriority *= 10;
- }
- if (tempPriority > priority) {
- priority = tempPriority;
- priorityKey = s;
- }
- }
- return priorityKey;
- }
-
- private double getMinCardSp(Collection<StatementPattern> nodes) {
-
- double cardinality = Double.MAX_VALUE;
- double tempCard = -1;
-
- if (eval == null) {
- return 1;
- }
-
- for (StatementPattern sp : nodes) {
-
- try {
- tempCard = eval.getCardinality(conf, sp);
-
- if (tempCard < cardinality) {
- cardinality = tempCard;
-
- }
- } catch (Exception e) {
- e.printStackTrace();
- }
-
- }
-
- return cardinality;
-
- }
-
- private double getCardinality(Collection<StatementPattern> spNodes) {
-
- double cardinality = Double.MAX_VALUE;
- double tempCard = -1;
-
-
- if(eval == null) {
- return 1;
- }
-
- List<StatementPattern> nodes = Lists.newArrayList(spNodes);
-
- AccumuloSelectivityEvalDAO ase = (AccumuloSelectivityEvalDAO) eval;
- ase.setDenormalized(true);
-
- try {
-
- for (int i = 0; i < nodes.size(); i++) {
- for (int j = i + 1; j < nodes.size(); j++) {
- tempCard = ase.getJoinSelect(conf, nodes.get(i), nodes.get(j));
- if (tempCard < cardinality) {
- cardinality = tempCard;
- }
- }
- }
-
- } catch (Exception e) {
- e.printStackTrace();
- }
-
- ase.setDenormalized(false);
-
- return cardinality / (nodes.size() + 1);
-
- }
-
- protected <L extends List<TupleExpr>> L getJoinArgs(TupleExpr tupleExpr, L joinArgs) {
- if (tupleExpr instanceof Join) {
- if (!(((Join) tupleExpr).getLeftArg() instanceof FixedStatementPattern)
- && !(((Join) tupleExpr).getRightArg() instanceof DoNotExpandSP)) {
- Join join = (Join) tupleExpr;
- getJoinArgs(join.getLeftArg(), joinArgs);
- getJoinArgs(join.getRightArg(), joinArgs);
- }
- } else if(tupleExpr instanceof Filter) {
- joinArgs.add(tupleExpr);
- getJoinArgs(((Filter)tupleExpr).getArg(), joinArgs);
- } else {
- joinArgs.add(tupleExpr);
- }
-
- return joinArgs;
- }
-
- }
-
-
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/5a03ef61/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/entity/EntityTupleSet.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/entity/EntityTupleSet.java b/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/entity/EntityTupleSet.java
deleted file mode 100644
index dbe7a53..0000000
--- a/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/entity/EntityTupleSet.java
+++ /dev/null
@@ -1,264 +0,0 @@
-package mvm.rya.indexing.accumulo.entity;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-
-import info.aduna.iteration.CloseableIteration;
-
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Set;
-
-import mvm.rya.accumulo.AccumuloRdfConfiguration;
-import mvm.rya.accumulo.AccumuloRyaDAO;
-import mvm.rya.api.RdfCloudTripleStoreConfiguration;
-import mvm.rya.indexing.accumulo.ConfigUtils;
-import mvm.rya.indexing.accumulo.entity.StarQuery.CardinalityStatementPattern;
-import mvm.rya.joinselect.AccumuloSelectivityEvalDAO;
-import mvm.rya.prospector.service.ProspectorServiceEvalStatsDAO;
-import mvm.rya.rdftriplestore.RdfCloudTripleStore;
-import mvm.rya.rdftriplestore.RdfCloudTripleStoreConnection;
-import mvm.rya.rdftriplestore.evaluation.ExternalBatchingIterator;
-
-import org.apache.accumulo.core.client.AccumuloException;
-import org.apache.accumulo.core.client.AccumuloSecurityException;
-import org.apache.accumulo.core.client.Connector;
-import org.apache.commons.io.IOUtils;
-import org.openrdf.query.BindingSet;
-import org.openrdf.query.QueryEvaluationException;
-import org.openrdf.query.algebra.StatementPattern;
-import org.openrdf.query.algebra.Var;
-import org.openrdf.query.algebra.evaluation.QueryBindingSet;
-import org.openrdf.query.algebra.evaluation.impl.ExternalSet;
-import org.openrdf.sail.SailException;
-
-import com.beust.jcommander.internal.Sets;
-import com.google.common.base.Joiner;
-
-public class EntityTupleSet extends ExternalSet implements ExternalBatchingIterator {
-
-
- private StarQuery starQuery;
- private RdfCloudTripleStoreConfiguration conf;
- private Set<String> variables;
- private double cardinality = -1;
- private StatementPattern minSp;
- private double minCard;
- private Connector accCon = null;
- private boolean evalOptUsed = false;
-
- public EntityTupleSet() {
-
- }
-
- public EntityTupleSet(StarQuery sq, RdfCloudTripleStoreConfiguration conf) {
- this.starQuery = sq;
- this.conf = conf;
-
- variables = Sets.newHashSet();
- if(!starQuery.commonVarConstant()) {
- variables.add(starQuery.getCommonVarName());
- }
- variables.addAll(starQuery.getUnCommonVars());
-
- init();
-
- }
-
- public EntityTupleSet(StarQuery sq, RdfCloudTripleStoreConfiguration conf, boolean evalOptUsed) {
- this(sq,conf);
- this.evalOptUsed = evalOptUsed;
- }
-
- private void init() {
-
- try {
- accCon = ConfigUtils.getConnector(conf);
- } catch (AccumuloException e) {
- e.printStackTrace();
- } catch (AccumuloSecurityException e) {
- e.printStackTrace();
- }
- if (conf.isUseStats() && conf.isUseSelectivity()) {
-
- ProspectorServiceEvalStatsDAO evalDao = new ProspectorServiceEvalStatsDAO(accCon, conf);
- evalDao.init();
- AccumuloSelectivityEvalDAO ase = new AccumuloSelectivityEvalDAO(conf, accCon);
- ase.setRdfEvalDAO(evalDao);
- ase.init();
-
- cardinality = starQuery.getCardinality(ase);
- CardinalityStatementPattern csp = starQuery.getMinCardSp(ase);
-
- minCard = csp.getCardinality();
- minSp = csp.getSp();
- } else {
- // TODO come up with a better default if cardinality is not
- // initialized
- cardinality = minCard = 1;
- minSp = starQuery.getNodes().get(0);
- }
-
- }
-
- @Override
- public Set<String> getBindingNames() {
- return starQuery.getBindingNames();
- }
-
- @Override
- public Set<String> getAssuredBindingNames() {
- return starQuery.getAssuredBindingNames();
- }
-
- public Set<String> getVariables() {
- return variables;
- }
-
-
- @Override
- public String getSignature() {
- return "(EntityCentric Projection) " + " common Var: " + starQuery.getCommonVarName() + " variables: " + Joiner.on(", ").join(variables).replaceAll("\\s+", " ");
- }
-
- public StarQuery getStarQuery() {
- return starQuery;
- }
-
- public void setStarQuery(StarQuery sq) {
- this.starQuery = sq;
- }
-
-
- @Override
- public EntityTupleSet clone() {
- StarQuery sq = new StarQuery(starQuery);
- return new EntityTupleSet(sq, conf);
- }
-
-
- @Override
- public double cardinality() {
- return cardinality;
- }
-
-
- public double getMinSpCard() {
- return minCard;
- }
-
-
- @Override
- public CloseableIteration<BindingSet, QueryEvaluationException> evaluate(BindingSet bindings) throws QueryEvaluationException {
-
- // if starQuery contains node with cardinality less than 1000 and node
- // only has one variable, and number of SPs in starQuery is greater than 2, it is
- // more efficient to first evaluate this node and then pass the bindings
- // into the remainder of the star query to be evaluated
- if (minCard < 1000 && starQuery.size() > 2 && numberOfSpVars(minSp) == 1 && !starQuery.commonVarConstant()) {
-
- try {
- RdfCloudTripleStoreConnection conn = getRyaSailConnection();
- CloseableIteration<BindingSet, QueryEvaluationException> sol = (CloseableIteration<BindingSet, QueryEvaluationException>) conn
- .evaluate(minSp, null, bindings, false);
-
- Set<BindingSet> bSet = Sets.newHashSet();
- while (sol.hasNext()) {
- //TODO this is not optimal - should check if bindings variables intersect minSp variables
- //creating the following QueryBindingSet is only necessary if no intersection occurs
- QueryBindingSet bs = new QueryBindingSet();
- bs.addAll(sol.next());
- bs.addAll(bindings);
- bSet.add(bs);
- }
-
- List<StatementPattern> spList = starQuery.getNodes();
- spList.remove(minSp);
-
- StarQuery sq = new StarQuery(spList);
- conn.close();
-
- return (new EntityTupleSet(sq, conf, true)).evaluate(bSet);
-
- } catch (Exception e) {
- throw new QueryEvaluationException(e);
- }
- } else {
- this.evalOptUsed = true;
- return this.evaluate(Collections.singleton(bindings));
- }
-
- }
-
-
- private int numberOfSpVars(StatementPattern sp) {
- List<Var> varList = sp.getVarList();
- int varCount = 0;
-
- for(int i = 0; i < 3; i++) {
- if(!varList.get(i).isConstant()) {
- varCount++;
- }
- }
-
- return varCount;
- }
-
-
- @Override
- public CloseableIteration<BindingSet,QueryEvaluationException> evaluate(final Collection<BindingSet> bindingset) throws QueryEvaluationException {
-
- if(bindingset.size() < 2 && !this.evalOptUsed) {
- BindingSet bs = new QueryBindingSet();
- if (bindingset.size() == 1) {
- bs = bindingset.iterator().next();
- }
- return this.evaluate(bs);
- }
- //TODO possibly refactor if bindingset.size() > 0 to take advantage of optimization in evaluate(BindingSet bindingset)
- AccumuloDocIdIndexer adi = null;
- try {
- adi = new AccumuloDocIdIndexer(conf);
- return adi.queryDocIndex(starQuery, bindingset);
- } catch (Exception e) {
- throw new QueryEvaluationException(e);
- } finally {
- IOUtils.closeQuietly(adi);
- }
- }
-
-
- private RdfCloudTripleStoreConnection getRyaSailConnection() throws AccumuloException,
- AccumuloSecurityException, SailException {
- final RdfCloudTripleStore store = new RdfCloudTripleStore();
- AccumuloRyaDAO crdfdao = new AccumuloRyaDAO();
- crdfdao.setConnector(accCon);
- AccumuloRdfConfiguration acc = new AccumuloRdfConfiguration(conf);
- crdfdao.setConf(acc);
- store.setRyaDAO(crdfdao);
- store.initialize();
-
- return (RdfCloudTripleStoreConnection) store.getConnection();
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/5a03ef61/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/entity/StarQuery.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/entity/StarQuery.java b/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/entity/StarQuery.java
deleted file mode 100644
index e9d2f85..0000000
--- a/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/entity/StarQuery.java
+++ /dev/null
@@ -1,636 +0,0 @@
-package mvm.rya.indexing.accumulo.entity;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import mvm.rya.accumulo.documentIndex.TextColumn;
-import mvm.rya.api.domain.RyaType;
-import mvm.rya.api.domain.RyaURI;
-import mvm.rya.api.persist.joinselect.SelectivityEvalDAO;
-import mvm.rya.api.resolver.RdfToRyaConversions;
-import mvm.rya.api.resolver.RyaContext;
-import mvm.rya.api.resolver.RyaTypeResolverException;
-import mvm.rya.joinselect.AccumuloSelectivityEvalDAO;
-
-import org.apache.accumulo.core.client.TableNotFoundException;
-import org.apache.hadoop.io.Text;
-import org.openrdf.model.Value;
-import org.openrdf.query.BindingSet;
-import org.openrdf.query.algebra.StatementPattern;
-import org.openrdf.query.algebra.Var;
-
-import com.beust.jcommander.internal.Maps;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-import com.google.common.primitives.Bytes;
-
-public class StarQuery {
-
- private List<StatementPattern> nodes;
- private TextColumn[] nodeColumnCond;
- private String commonVarName;
- private Var commonVar;
- private Var context;
- private String contextURI ="";
- private Map<String,Integer> varPos = Maps.newHashMap();
- private boolean isCommonVarURI = false;
-
-
- public StarQuery(List<StatementPattern> nodes) {
- this.nodes = nodes;
- if(nodes.size() == 0) {
- throw new IllegalArgumentException("Nodes cannot be empty!");
- }
- nodeColumnCond = new TextColumn[nodes.size()];
- Var tempContext = (Var) nodes.get(0).getContextVar();
- if(tempContext != null) {
- context = (Var)tempContext.clone();
- } else {
- context = new Var();
- }
- try {
- this.init();
- } catch (RyaTypeResolverException e) {
- e.printStackTrace();
- }
- }
-
-
- public StarQuery(Set<StatementPattern> nodes) {
- this(Lists.newArrayList(nodes));
- }
-
- public int size() {
- return nodes.size();
- }
-
- public StarQuery(StarQuery other) {
- this(other.nodes);
- }
-
-
- public List<StatementPattern> getNodes() {
- return nodes;
- }
-
-
- public TextColumn[] getColumnCond() {
- return nodeColumnCond;
- }
-
-
- public boolean isCommonVarURI() {
- return isCommonVarURI;
- }
-
- public String getCommonVarName() {
- return commonVarName;
- }
-
- public Var getCommonVar() {
- return commonVar;
- }
-
- public boolean commonVarHasValue() {
- return commonVar.getValue() != null;
- }
-
- public boolean commonVarConstant() {
- return commonVar.isConstant();
- }
-
- public String getCommonVarValue() {
- if(commonVarHasValue()) {
- return commonVar.getValue().stringValue();
- } else {
- return null;
- }
- }
-
-
- public Set<String> getUnCommonVars() {
- return varPos.keySet();
- }
-
-
- public Map<String,Integer> getVarPos() {
- return varPos;
- }
-
- public boolean hasContext() {
- return context.getValue() != null;
- }
-
- public String getContextURI() {
- return contextURI;
- }
-
-
-
-
- public Set<String> getBindingNames() {
-
- Set<String> bindingNames = Sets.newHashSet();
-
- for(StatementPattern sp: nodes) {
-
- if(bindingNames.size() == 0) {
- bindingNames = sp.getBindingNames();
- } else {
- bindingNames = Sets.union(bindingNames, sp.getBindingNames());
- }
-
- }
-
- return bindingNames;
-
- }
-
-
-
-
- public Set<String> getAssuredBindingNames() {
-
- Set<String> bindingNames = Sets.newHashSet();
-
- for(StatementPattern sp: nodes) {
-
- if(bindingNames.size() == 0) {
- bindingNames = sp.getAssuredBindingNames();
- } else {
- bindingNames = Sets.union(bindingNames, sp.getAssuredBindingNames());
- }
-
- }
-
- return bindingNames;
-
- }
-
-
-
-
-
-
-
- public CardinalityStatementPattern getMinCardSp(AccumuloSelectivityEvalDAO ase) {
-
- StatementPattern minSp = null;
- double cardinality = Double.MAX_VALUE;
- double tempCard = -1;
-
- for (StatementPattern sp : nodes) {
-
- try {
- tempCard = ase.getCardinality(ase.getConf(), sp);
-
- if (tempCard < cardinality) {
- cardinality = tempCard;
- minSp = sp;
- }
- } catch (TableNotFoundException e) {
- e.printStackTrace();
- }
-
-
- }
-
- return new CardinalityStatementPattern(minSp, cardinality) ;
- }
-
-
-
- public class CardinalityStatementPattern {
-
- private StatementPattern sp;
- private double cardinality;
-
- public CardinalityStatementPattern(StatementPattern sp, double cardinality) {
- this.sp = sp;
- this.cardinality = cardinality;
- }
-
- public StatementPattern getSp() {
- return sp;
- }
-
- public double getCardinality() {
- return cardinality;
- }
-
- }
-
-
- public double getCardinality( AccumuloSelectivityEvalDAO ase) {
-
- double cardinality = Double.MAX_VALUE;
- double tempCard = -1;
-
- ase.setDenormalized(true);
-
- try {
-
- for (int i = 0; i < nodes.size(); i++) {
- for (int j = i + 1; j < nodes.size(); j++) {
-
- tempCard = ase.getJoinSelect(ase.getConf(), nodes.get(i), nodes.get(j));
-
- if (tempCard < cardinality) {
- cardinality = tempCard;
- }
-
- }
- }
-
- } catch (Exception e) {
- e.printStackTrace();
- }
-
- ase.setDenormalized(false);
-
- return cardinality/(nodes.size() + 1);
-
- }
-
-
-
- public static Set<String> getCommonVars(StarQuery query, BindingSet bs) {
-
- Set<String> starQueryVarNames = Sets.newHashSet();
-
- if(bs == null || bs.size() == 0) {
- return Sets.newHashSet();
- }
-
- Set<String> bindingNames = bs.getBindingNames();
- starQueryVarNames.addAll(query.getUnCommonVars());
- if(!query.commonVarConstant()) {
- starQueryVarNames.add(query.getCommonVarName());
- }
-
- return Sets.intersection(bindingNames, starQueryVarNames);
-
-
- }
-
-
-
-
-
-
- public static StarQuery getConstrainedStarQuery(StarQuery query, BindingSet bs) {
-
- if(bs.size() == 0) {
- return query;
- }
-
- Set<String> bindingNames = bs.getBindingNames();
- Set<String> unCommonVarNames = query.getUnCommonVars();
- Set<String> intersectVar = Sets.intersection(bindingNames, unCommonVarNames);
-
-
- if (!query.commonVarConstant()) {
-
- Value v = bs.getValue(query.getCommonVarName());
-
- if (v != null) {
- query.commonVar.setValue(v);
- }
- }
-
- for(String s: intersectVar) {
- try {
- query.nodeColumnCond[query.varPos.get(s)] = query.setValue(query.nodeColumnCond[query.varPos.get(s)], bs.getValue(s));
- } catch (RyaTypeResolverException e) {
- e.printStackTrace();
- }
- }
-
- return query;
- }
-
-
- private TextColumn setValue(TextColumn tc, Value v) throws RyaTypeResolverException {
-
- String cq = tc.getColumnQualifier().toString();
- String[] cqArray = cq.split("\u0000");
-
- if (cqArray[0].equals("subject")) {
- // RyaURI subjURI = (RyaURI) RdfToRyaConversions.convertValue(v);
- tc.setColumnQualifier(new Text("subject" + "\u0000" + v.stringValue()));
- tc.setIsPrefix(false);
- } else if (cqArray[0].equals("object")) {
- RyaType objType = RdfToRyaConversions.convertValue(v);
- byte[][] b1 = RyaContext.getInstance().serializeType(objType);
- byte[] b2 = Bytes.concat("object".getBytes(),
- "\u0000".getBytes(), b1[0], b1[1]);
- tc.setColumnQualifier(new Text(b2));
- tc.setIsPrefix(false);
- } else {
- throw new IllegalStateException("Invalid direction!");
- }
-
- return tc;
-
- }
-
-
-
- //assumes nodes forms valid star query with only one common variable
- //assumes nodes and commonVar has been set
- private TextColumn nodeToTextColumn(StatementPattern node, int i) throws RyaTypeResolverException {
-
- RyaContext rc = RyaContext.getInstance();
-
- Var subjVar = node.getSubjectVar();
- Var predVar = node.getPredicateVar();
- Var objVar = node.getObjectVar();
-
- RyaURI predURI = (RyaURI) RdfToRyaConversions.convertValue(node.getPredicateVar().getValue());
-
-
- //assumes StatementPattern contains at least on variable
- if (subjVar.isConstant()) {
- if (commonVarConstant()) {
- varPos.put(objVar.getName(), i);
- return new TextColumn(new Text(predURI.getData()), new Text("object"));
- } else {
- return new TextColumn(new Text(predURI.getData()), new Text("subject" + "\u0000"
- + subjVar.getValue().stringValue()));
- }
-
- } else if (objVar.isConstant()) {
-
- if (commonVarConstant()) {
- varPos.put(subjVar.getName(), i);
- return new TextColumn(new Text(predURI.getData()), new Text("subject"));
- } else {
-
- isCommonVarURI = true;
- RyaType objType = RdfToRyaConversions.convertValue(objVar.getValue());
- byte[][] b1 = rc.serializeType(objType);
-
- byte[] b2 = Bytes.concat("object".getBytes(), "\u0000".getBytes(), b1[0], b1[1]);
- return new TextColumn(new Text(predURI.getData()), new Text(b2));
- }
-
- } else {
- if (subjVar.getName().equals(commonVarName)) {
-
- isCommonVarURI = true;
- varPos.put(objVar.getName(), i);
-
- TextColumn tc = new TextColumn(new Text(predURI.getData()), new Text("object"));
- tc.setIsPrefix(true);
- return tc;
-
- } else {
-
- varPos.put(subjVar.getName(), i);
-
- TextColumn tc = new TextColumn(new Text(predURI.getData()), new Text("subject"));
- tc.setIsPrefix(true);
- return tc;
-
- }
-
-
- }
-
-
- }
-
-
-
-
- //called in constructor after nodes set
- //assumes nodes and nodeColumnCond are same size
- private void init() throws RyaTypeResolverException {
-
-
- commonVar = this.getCommonVar(nodes);
- if(!commonVar.isConstant()) {
- commonVarName = commonVar.getName();
- } else {
- commonVarName = commonVar.getName().substring(7);
- }
-
- if(hasContext()) {
- RyaURI ctxtURI = (RyaURI) RdfToRyaConversions.convertValue(context.getValue());
- contextURI = ctxtURI.getData();
- }
-
- for(int i = 0; i < nodes.size(); i++){
- nodeColumnCond[i] = nodeToTextColumn(nodes.get(i), i);
- }
-
- }
-
-
-
-
-
-
-
-
- // called after nodes set
- // assumes nodes forms valid query with single, common variable
- private Var getCommonVar(List<StatementPattern> nodes) {
-
- Set<Var> vars = null;
- List<Var> tempVar;
- Set<Var> tempSet;
-
- int i = 0;
- for (StatementPattern sp : nodes) {
-
- if (vars == null) {
- vars = Sets.newHashSet();
- vars.add(sp.getSubjectVar());
- vars.add(sp.getObjectVar());
- } else {
- tempSet = Sets.newHashSet();
- tempSet.add(sp.getSubjectVar());
- tempSet.add(sp.getObjectVar());
- vars = Sets.intersection(vars, tempSet);
- }
-
- }
-
- if (vars.size() == 1) {
- return vars.iterator().next();
- } else if (vars.size() > 1) {
- Var first = null;
-
- i = 0;
-
- for (Var v : vars) {
- i++;
-
- if (i == 1) {
- first = v;
- } else {
- if (v.isConstant()) {
- return v;
- }
- }
- }
-
- return first;
-
- } else {
- throw new IllegalStateException("No common Var!");
- }
-
- }
-
-
- //assumes bindings is not of size 0
- private static boolean isBindingsetValid(Set<String> bindings) {
-
- int varCount = 0;
-
- if (bindings.size() == 1) {
- return true;
- } else {
-
-
- for (String s : bindings) {
- if (!s.startsWith("-const-")) {
- varCount++;
- }
- if (varCount > 1) {
- return false;
- }
- }
-
- return true;
-
- }
-
- }
-
-
-
-
-
- public static boolean isValidStarQuery(Collection<StatementPattern> nodes) {
-
- Set<String> bindings = null;
- boolean contextSet = false;
- Var context = null;
-
- if(nodes.size() < 2) {
- return false;
- }
-
- for(StatementPattern sp: nodes) {
-
- Var tempContext = sp.getContextVar();
- Var predVar = sp.getPredicateVar();
-
- //does not support variable context
- if(tempContext != null && !tempContext.isConstant()) {
- return false;
- }
- if(!contextSet) {
- context = tempContext;
- contextSet = true;
- } else {
-
- if(context == null && tempContext != null) {
- return false;
- } else if (context != null && !context.equals(tempContext)) {
- return false;
- }
- }
-
- if(!predVar.isConstant()) {
- return false;
- }
-
- if(bindings == null ) {
- bindings = sp.getBindingNames();
- if(bindings.size() == 0) {
- return false;
- }
- } else {
- bindings = Sets.intersection(bindings, sp.getBindingNames());
- if(bindings.size() == 0) {
- return false;
- }
- }
-
- }
-
-
- return isBindingsetValid(bindings);
- }
-
-
-
-
-
-// private static Set<String> getSpVariables(StatementPattern sp) {
-//
-// Set<String> variables = Sets.newHashSet();
-// List<Var> varList = sp.getVarList();
-//
-// for(Var v: varList) {
-// if(!v.isConstant()) {
-// variables.add(v.getName());
-// }
-// }
-//
-// return variables;
-//
-// }
-//
-
-
-
-
-
- public String toString() {
-
- String s = "Term conditions: " + "\n";
-
- for(int i = 0; i < this.nodeColumnCond.length; i++) {
- s = s + nodeColumnCond[i].toString() + "\n";
- }
-
- s = s + "Common Var: " + this.commonVar.toString() + "\n";
- s = s + "Context: " + this.contextURI;
-
- return s;
-
- }
-
-
-
-
-
-
-}