You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rya.apache.org by mi...@apache.org on 2016/10/15 20:06:45 UTC
[25/69] [abbrv] [partial] incubator-rya git commit: RYA-198 Renaming
Files
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/extras/indexing/src/main/java/mvm/rya/indexing/TemporalInstant.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/TemporalInstant.java b/extras/indexing/src/main/java/mvm/rya/indexing/TemporalInstant.java
deleted file mode 100644
index f4e6d95..0000000
--- a/extras/indexing/src/main/java/mvm/rya/indexing/TemporalInstant.java
+++ /dev/null
@@ -1,83 +0,0 @@
-package mvm.rya.indexing;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-
-import java.io.Serializable;
-
-import org.joda.time.DateTime;
-import org.joda.time.DateTimeZone;
-
-/**
- * Time and date interface for building intervals.
- *
- *Implementations:
- * Implementation should have a factory method for TemporalInterval since TemporalIntervals reference only this
- * interface for begin & end, so it injects an implementation.
- * public static TemporalInterval parseInterval(String dateTimeInterval)
- *
- * The following are notes and may not have been implemented.
- *
- * = rfc3339
- *https://www.ietf.org/rfc/rfc3339.txt
- * a subset of ISO-8601
- * YYYY-MM-DDThh:mm:ss.fffZ
- * Limits:
- *All dates and times are assumed to be in the "current era",
- somewhere between 0000AD and 9999AD.
- * resolution: to the second, or millisecond if the optional fraction is used.
- *
- * = epoch
- * 32bit or 64bit integer specifying the number of seconds since a standard date-time (1970)
- * 32bit is good until 2038.
- * 64bit is good until after the heat death of our universe
- *
- */
-public interface TemporalInstant extends Comparable<TemporalInstant>, Serializable {
- @Override
- public boolean equals(Object obj) ;
-
- @Override
- public int compareTo(TemporalInstant o) ;
-
- @Override
- public int hashCode() ;
- /**
- * Get the date as a byte array.
- */
- public byte[] getAsKeyBytes();
- /**
- * Get the date as a String.
- */
- public String getAsKeyString();
- /**
- * Get the date as a human readable for reporting with timeZone.
- */
- public String getAsReadable(DateTimeZone tz);
- /**
- * Get the date as a human readable for reporting, timeZone is implementation specific.
- */
- public String getAsReadable();
- /**
- * Get the date as a Joda/Java v8 DateTime.
- */
- public DateTime getAsDateTime();
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/extras/indexing/src/main/java/mvm/rya/indexing/TemporalInstantRfc3339.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/TemporalInstantRfc3339.java b/extras/indexing/src/main/java/mvm/rya/indexing/TemporalInstantRfc3339.java
deleted file mode 100644
index f47bb92..0000000
--- a/extras/indexing/src/main/java/mvm/rya/indexing/TemporalInstantRfc3339.java
+++ /dev/null
@@ -1,219 +0,0 @@
-/**
- *
- */
-package mvm.rya.indexing;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import org.apache.commons.codec.binary.StringUtils;
-import org.joda.time.DateTime;
-import org.joda.time.DateTimeZone;
-import org.joda.time.format.DateTimeFormatter;
-import org.joda.time.format.ISODateTimeFormat;
-
-/**
- * Immutable date and time instance returning a human readable key.
- * Preserves the Time zone, but not stored in the key.
- * Converts fields (hours, etc) correctly for tz=Zulu when stored,
- * so the original timezone is not preserved when retrieved.
- *
- * Uses rfc 3339, which looks like: YYYY-MM-DDThh:mm:ssZ a subset
- * of ISO-8601 : https://www.ietf.org/rfc/rfc3339.txt
- *
- * Limits: All dates and times are assumed to be in the "current era", no BC,
- * somewhere between 0000AD and 9999AD.
- *
- * Resolution: to the second, or millisecond if the optional fraction is used.
- *
- * This is really a wrapper for Joda DateTime. if you need functionality from
- * that wonderful class, simply use t.getAsDateTime().
- *
- */
-public class TemporalInstantRfc3339 implements TemporalInstant {
-
- private static final long serialVersionUID = -7790000399142290309L;
-
- private final DateTime dateTime;
- /**
- * Format key like this: YYYY-MM-DDThh:mm:ssZ
- */
- public final static DateTimeFormatter FORMATTER = ISODateTimeFormat.dateTimeNoMillis();
-
- public static final Pattern PATTERN = Pattern.compile("\\[(.*)\\,(.*)\\].*");
-
- /**
- * New date assumed UTC time zone.
- *
- * @param year
- * @param month
- * @param day
- * @param hour
- * @param minute
- * @param second
- */
- public TemporalInstantRfc3339(final int year, final int month, final int day, final int hour, final int minute, final int second) {
- dateTime = new DateTime(year, month, day, hour, minute, second, DateTimeZone.UTC);
- }
-
- /**
- * Construct with a Joda/java v8 DateTime;
- * TZ is preserved, but not in the key.
- *
- * @param dateTime
- * initialize with this date time. Converted to zulu time zone for key generation.
- * @return
- */
- public TemporalInstantRfc3339(final DateTime datetime) {
- dateTime = datetime;
- }
- /**
- * Get an interval setting beginning and end with this implementation of {@link TemporalInstant}.
- * beginning must be less than end.
- *
- * @param dateTimeInterval String in the form [dateTime1,dateTime2]
- */
- public static TemporalInterval parseInterval(final String dateTimeInterval) {
-
- final Matcher matcher = PATTERN.matcher(dateTimeInterval);
- if (matcher.find()) {
- // Got a date time pair, parse into an interval.
- return new TemporalInterval(
- new TemporalInstantRfc3339(new DateTime(matcher.group(1))),
- new TemporalInstantRfc3339(new DateTime(matcher.group(2))));
- }
- throw new IllegalArgumentException("Can't parse interval, expecting '[ISO8601dateTime1,ISO8601dateTime2]', actual: "+dateTimeInterval);
- }
-
- /**
- * if this is older returns -1, equal 0, else 1
- *
- */
- @Override
- public int compareTo(final TemporalInstant that) {
- return getAsKeyString().compareTo(that.getAsKeyString());
- }
-
- @Override
- public byte[] getAsKeyBytes() {
- return StringUtils.getBytesUtf8(getAsKeyString());
- }
-
- @Override
- public String getAsKeyString() {
- return dateTime.withZone(DateTimeZone.UTC).toString(FORMATTER);
- }
-
- /**
- * Readable string, formated local time at {@link DateTimeZone}.
- * If the timezone is UTC (Z), it was probably a key from the database.
- * If the server and client are in different Time zone, should probably use the client timezone.
- *
- * Time at specified time zone:
- * instant.getAsReadable(DateTimeZone.forID("-05:00")));
- * instant.getAsReadable(DateTimeZone.getDefault()));
- *
- * Use original time zone set in the constructor:
- * instant.getAsDateTime().toString(TemporalInstantRfc3339.FORMATTER));
- *
- */
- @Override
- public String getAsReadable(final DateTimeZone dateTimeZone) {
- return dateTime.withZone(dateTimeZone).toString(FORMATTER);
- }
-
- /**
- * Use original time zone set in the constructor, or UTC if from parsing the key.
- */
- @Override
- public String getAsReadable() {
- return dateTime.toString(FORMATTER);
- }
-
- /**
- * default toString, same as getAsReadable().
- */
- @Override
- public String toString() {
- return getAsReadable();
- }
-
- /**
- * Show readable time converted to the default timezone.
- */
- @Override
- public DateTime getAsDateTime() {
- return dateTime;
- }
-
- /**
- * Minimum Date, used for infinitely past.
- */
- private static final TemporalInstant MINIMUM = new TemporalInstantRfc3339(new DateTime(Long.MIN_VALUE));
- /**
- * maximum date/time is used for infinitely in the future.
- */
- private static final TemporalInstant MAXIMUM = new TemporalInstantRfc3339(new DateTime(Long.MAX_VALUE));
-
- /**
- * infinite past date.
- * @return an instant that will compare as NEWER than anything but itself.
- */
- public static TemporalInstant getMinimumInstance() {
- return MINIMUM;
- }
- /**
- * infinite future date.
- * @return an instant that will compare as OLDER than anything but itself
- */
-
- public static TemporalInstant getMaximumInstance() {
- return MAXIMUM;
- }
-
- /* (non-Javadoc)
- * @see java.lang.Object#hashCode()
- */
- @Override
- public int hashCode() {
- return getAsKeyString().hashCode();
- }
-
- /* (non-Javadoc)
- * @see java.lang.Object#equals(java.lang.Object)
- */
- @Override
- public boolean equals(final Object obj) {
- if (this == obj) {
- return true;
- }
- if (obj == null) {
- return false;
- }
- if (getClass() != obj.getClass()) {
- return false;
- }
- final TemporalInstantRfc3339 other = (TemporalInstantRfc3339) obj;
- return (getAsKeyString().equals(other.getAsKeyString()));
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/extras/indexing/src/main/java/mvm/rya/indexing/TemporalInterval.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/TemporalInterval.java b/extras/indexing/src/main/java/mvm/rya/indexing/TemporalInterval.java
deleted file mode 100644
index b23b99c..0000000
--- a/extras/indexing/src/main/java/mvm/rya/indexing/TemporalInterval.java
+++ /dev/null
@@ -1,181 +0,0 @@
-package mvm.rya.indexing;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-
-import java.io.UnsupportedEncodingException;
-
-/**
- * A time with beginning and end date and time, which could be indefinitely in
- * the past or future. Immutable, so it's thread safe. For use in reading and
- * writing from Rya's temporal indexing scheme.
- *
- */
-public class TemporalInterval implements Comparable<TemporalInterval> {
-
- // the beginning and end. Read-only because they are final references to immutable objects.
- private final TemporalInstant hasBeginning;
- private final TemporalInstant hasEnd;
-
- /**
- * Separate the beginning and end with this.
- * Used because Joda time library's interval uses this.
- * TODO: Move this down to the TemporalInterval implementation.
- * TODO: Then add a TemporalInterval.keyConcatenate().
- */
- public static final String DELIMITER = "/";
-
-// /**
-// * Empty constructor -- not allowed, no defaults.
-// * For an infinite span of time: do it like this:
-// * new TemporalInterval(TemporalInstantImpl.getMinimum, TemporalInstantImpl.getMaximum)
-// */
-// public TemporalInterval() {
-// hasBeginning = null;
-// hasEnd = null;
-// }
-
- /**
- * Constructor setting beginning and end with an implementation of {@link TemporalInstant}.
- * beginning must be less than end.
- *
- * @param hasBeginning
- * @param hasEnd
- */
- public TemporalInterval(TemporalInstant hasBeginning, TemporalInstant hasEnd) {
- super();
- if (hasBeginning != null && hasEnd != null && 0 < hasBeginning.compareTo(hasEnd))
- throw new IllegalArgumentException("The Beginning instance must not compare greater than the end.");
- this.hasBeginning = hasBeginning;
- this.hasEnd = hasEnd;
- }
-
- /**
- * @return the hasBeginning
- */
- public TemporalInstant getHasBeginning() {
- return hasBeginning;
- }
-
- /**
- * @return the hasEnd
- */
- public TemporalInstant getHasEnd() {
- return hasEnd;
- }
-
- /**
- * True if CompareTo() says equal (0)
- */
- @Override
- public boolean equals(Object other) {
- return other instanceof TemporalInterval
- && this.compareTo((TemporalInterval) other) == 0;
- };
-
- /**
- * Compare beginnings, if the same then compare ends, or equal if beginnings equal and endings equal.
- * Nulls represent infinity.
- */
- @Override
- public int compareTo(TemporalInterval other) {
- int compBegins = this.hasBeginning.compareTo(other.hasBeginning);
- if (0 == compBegins)
- return this.hasEnd.compareTo(other.hasEnd);
- else
- return compBegins;
-
- }
-
- /**
- * Hashcode for
- */
- @Override
- public int hashCode() {
- if (hasBeginning == null)
- if (hasEnd == null)
- return 0;
- else
- return hasEnd.hashCode();
- else
- return hashboth(this.hasBeginning.hashCode(),
- this.hasEnd.hashCode());
- }
-
- /**
- * Hashcode combining two string hashcodes.
- */
- protected static int hashboth(int i1, int i2) {
- // return (int) (( 1L * i1 * i2) ; % (1L + Integer.MAX_VALUE));
- // let the overflow happen. It won't throw an error.
- return (i1 + i2);
- }
-
- /**
- * Get the key use for rowid for the beginning of the interval. Use ascii
- * for conversion to catch and prevent multi-byte chars.
- *
- * @return
- */
- public byte[] getAsKeyBeginning() {
- try {
- return (hasBeginning.getAsKeyString() + DELIMITER + hasEnd
- .getAsKeyString()).getBytes("US-ASCII");
- } catch (UnsupportedEncodingException e) {
- // this is a code error, the strings are mostly numbers.
- throw new Error("while converting key string to ascii bytes", e);
- }
- }
-
- /**
- * get the key used for indexing the end of the interval. Use ascii for
- * conversion to catch and prevent multi-byte chars.
- *
- * @return
- */
- public byte[] getAsKeyEnd() {
- try {
- return (hasEnd.getAsKeyString() + DELIMITER + hasBeginning
- .getAsKeyString()).getBytes("US-ASCII");
- } catch (UnsupportedEncodingException e) {
- // this is a code error, the strings are mostly numbers and ascii
- // symbols.
- throw new Error("while converting key string to ascii bytes", e);
- }
- }
-
- /**
- * Format as a "period" in this paper. This is not a standard, really.
- * http://citeseerx.ist.psu.edu/viewdoc/download?doi=10.1.1.298.8948&rep=rep1&type=pdf
- * also consider using the typed literal syntax:
- * "[2010-01-01,2010-01-31]"^^xs:period
- * @return [begindate,enddate] for example: [2010-01-01,2010-01-31]
- *
- */
- public String getAsPair() {
- return "["+hasBeginning.getAsReadable() + "," + hasEnd.getAsReadable() + "]";
- }
-
- @Override
- public String toString() {
- return getAsPair() ;
- // return hasBeginning.getAsReadable() + DELIMITER + hasEnd.getAsReadable();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/extras/indexing/src/main/java/mvm/rya/indexing/TemporalTupleSet.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/TemporalTupleSet.java b/extras/indexing/src/main/java/mvm/rya/indexing/TemporalTupleSet.java
deleted file mode 100644
index 1677fed..0000000
--- a/extras/indexing/src/main/java/mvm/rya/indexing/TemporalTupleSet.java
+++ /dev/null
@@ -1,287 +0,0 @@
-package mvm.rya.indexing;
-
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.hadoop.conf.Configuration;
-import org.joda.time.DateTime;
-import org.openrdf.model.Statement;
-import org.openrdf.model.URI;
-import org.openrdf.model.impl.URIImpl;
-import org.openrdf.query.BindingSet;
-import org.openrdf.query.QueryEvaluationException;
-import org.openrdf.query.algebra.QueryModelVisitor;
-
-import com.google.common.base.Joiner;
-import com.google.common.collect.Maps;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import info.aduna.iteration.CloseableIteration;
-import mvm.rya.indexing.external.tupleSet.ExternalTupleSet;
-
-//Indexing Node for temporal expressions to be inserted into execution plan
-//to delegate temporal portion of query to temporal index
-public class TemporalTupleSet extends ExternalTupleSet {
-
- private final Configuration conf;
- private final TemporalIndexer temporalIndexer;
- private final IndexingExpr filterInfo;
-
- public TemporalTupleSet(final IndexingExpr filterInfo, final TemporalIndexer temporalIndexer) {
- this.filterInfo = filterInfo;
- this.temporalIndexer = temporalIndexer;
- conf = temporalIndexer.getConf();
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public Set<String> getBindingNames() {
- return filterInfo.getBindingNames();
- }
-
- /**
- * {@inheritDoc}
- * <p>
- * Note that we need a deep copy for everything that (during optimizations)
- * can be altered via {@link #visitChildren(QueryModelVisitor)}
- */
- @Override
- public TemporalTupleSet clone() {
- return new TemporalTupleSet(filterInfo, temporalIndexer);
- }
-
- @Override
- public double cardinality() {
- return 0.0; // No idea how the estimate cardinality here.
- }
-
- @Override
- public String getSignature() {
-
- return "(TemporalTuple Projection) " + "variables: " + Joiner.on(", ").join(getBindingNames()).replaceAll("\\s+", " ");
- }
-
- @Override
- public boolean equals(final Object other) {
- if (other == this) {
- return true;
- }
- if (!(other instanceof TemporalTupleSet)) {
- return false;
- }
- final TemporalTupleSet arg = (TemporalTupleSet) other;
- return filterInfo.equals(arg.filterInfo);
- }
-
- @Override
- public int hashCode() {
- int result = 17;
- result = 31*result + filterInfo.hashCode();
-
- return result;
- }
-
- /**
- * Returns an iterator over the result set associated with contained IndexingExpr.
- * <p>
- * Should be thread-safe (concurrent invocation {@link OfflineIterable} this
- * method can be expected with some query evaluators.
- */
- @Override
- public CloseableIteration<BindingSet, QueryEvaluationException> evaluate(final BindingSet bindings)
- throws QueryEvaluationException {
- final URI funcURI = filterInfo.getFunction();
- final SearchFunction searchFunction = (new TemporalSearchFunctionFactory(conf)).getSearchFunction(funcURI);
-
- if(filterInfo.getArguments().length > 1) {
- throw new IllegalArgumentException("Index functions do not support more than two arguments.");
- }
-
- final String queryText = filterInfo.getArguments()[0].stringValue();
- return IteratorFactory.getIterator(filterInfo.getSpConstraint(), bindings, queryText, searchFunction);
- }
-
- //returns appropriate search function for a given URI
- //search functions used by TemporalIndexer to query Temporal Index
- private class TemporalSearchFunctionFactory {
- private final Map<URI, SearchFunction> SEARCH_FUNCTION_MAP = Maps.newHashMap();
- Configuration conf;
-
- public TemporalSearchFunctionFactory(final Configuration conf) {
- this.conf = conf;
- }
-
- /**
- * Get a {@link TemporalSearchFunction} for a give URI.
- *
- * @param searchFunction
- * @return
- */
- public SearchFunction getSearchFunction(final URI searchFunction) {
- SearchFunction geoFunc = null;
- try {
- geoFunc = getSearchFunctionInternal(searchFunction);
- } catch (final QueryEvaluationException e) {
- e.printStackTrace();
- }
-
- return geoFunc;
- }
-
- private SearchFunction getSearchFunctionInternal(final URI searchFunction) throws QueryEvaluationException {
- final SearchFunction sf = SEARCH_FUNCTION_MAP.get(searchFunction);
-
- if (sf != null) {
- return sf;
- } else {
- throw new QueryEvaluationException("Unknown Search Function: " + searchFunction.stringValue());
- }
- }
-
- private final SearchFunction TEMPORAL_InstantAfterInstant = new SearchFunction() {
- @Override
- public CloseableIteration<Statement, QueryEvaluationException> performSearch(final String searchTerms,
- final StatementConstraints contraints) throws QueryEvaluationException {
- final TemporalInstant queryInstant = new TemporalInstantRfc3339(DateTime.parse(searchTerms));
- return temporalIndexer.queryInstantAfterInstant(queryInstant, contraints);
- }
-
- @Override
- public String toString() {
- return "TEMPORAL_InstantAfterInstant";
- };
- };
- private final SearchFunction TEMPORAL_InstantBeforeInstant = new SearchFunction() {
- @Override
- public CloseableIteration<Statement, QueryEvaluationException> performSearch(final String searchTerms,
- final StatementConstraints contraints) throws QueryEvaluationException {
- final TemporalInstant queryInstant = new TemporalInstantRfc3339(DateTime.parse(searchTerms));
- return temporalIndexer.queryInstantBeforeInstant(queryInstant, contraints);
- }
-
- @Override
- public String toString() {
- return "TEMPORAL_InstantBeforeInstant";
- };
- };
-
- private final SearchFunction TEMPORAL_InstantEqualsInstant = new SearchFunction() {
- @Override
- public CloseableIteration<Statement, QueryEvaluationException> performSearch(final String searchTerms,
- final StatementConstraints contraints) throws QueryEvaluationException {
- final TemporalInstant queryInstant = new TemporalInstantRfc3339(DateTime.parse(searchTerms));
- return temporalIndexer.queryInstantEqualsInstant(queryInstant, contraints);
- }
-
- @Override
- public String toString() {
- return "TEMPORAL_InstantEqualsInstant";
- };
- };
-
- private final SearchFunction TEMPORAL_InstantAfterInterval = new SearchFunction() {
- @Override
- public CloseableIteration<Statement, QueryEvaluationException> performSearch(final String searchTerms,
- final StatementConstraints contraints) throws QueryEvaluationException {
- final TemporalInterval queryInterval = TemporalInstantRfc3339.parseInterval(searchTerms);
- return temporalIndexer.queryInstantAfterInterval(queryInterval, contraints);
- }
-
- @Override
- public String toString() {
- return "TEMPORAL_InstantAfterInterval";
- };
- };
-
- private final SearchFunction TEMPORAL_InstantBeforeInterval = new SearchFunction() {
- @Override
- public CloseableIteration<Statement, QueryEvaluationException> performSearch(final String searchTerms,
- final StatementConstraints contraints) throws QueryEvaluationException {
- final TemporalInterval queryInterval = TemporalInstantRfc3339.parseInterval(searchTerms);
- return temporalIndexer.queryInstantBeforeInterval(queryInterval, contraints);
- }
-
- @Override
- public String toString() {
- return "TEMPORAL_InstantBeforeInterval";
- };
- };
-
- private final SearchFunction TEMPORAL_InstantInsideInterval = new SearchFunction() {
- @Override
- public CloseableIteration<Statement, QueryEvaluationException> performSearch(final String searchTerms,
- final StatementConstraints contraints) throws QueryEvaluationException {
- final TemporalInterval queryInterval = TemporalInstantRfc3339.parseInterval(searchTerms);
- return temporalIndexer.queryInstantInsideInterval(queryInterval, contraints);
- }
-
- @Override
- public String toString() {
- return "TEMPORAL_InstantInsideInterval";
- };
- };
-
- private final SearchFunction TEMPORAL_InstantHasBeginningInterval = new SearchFunction() {
- @Override
- public CloseableIteration<Statement, QueryEvaluationException> performSearch(final String searchTerms,
- final StatementConstraints contraints) throws QueryEvaluationException {
- final TemporalInterval queryInterval = TemporalInstantRfc3339.parseInterval(searchTerms);
- return temporalIndexer.queryInstantHasBeginningInterval(queryInterval, contraints);
- }
-
- @Override
- public String toString() {
- return "TEMPORAL_InstantHasBeginningInterval";
- };
- };
-
- private final SearchFunction TEMPORAL_InstantHasEndInterval = new SearchFunction() {
- @Override
- public CloseableIteration<Statement, QueryEvaluationException> performSearch(final String searchTerms,
- final StatementConstraints contraints) throws QueryEvaluationException {
- final TemporalInterval queryInterval = TemporalInstantRfc3339.parseInterval(searchTerms);
- return temporalIndexer.queryInstantHasEndInterval(queryInterval, contraints);
- }
-
- @Override
- public String toString() {
- return "TEMPORAL_InstantHasEndInterval";
- };
- };
-
- {
- final String TEMPORAL_NS = "tag:rya-rdf.org,2015:temporal#";
-
- SEARCH_FUNCTION_MAP.put(new URIImpl(TEMPORAL_NS+"after"), TEMPORAL_InstantAfterInstant);
- SEARCH_FUNCTION_MAP.put(new URIImpl(TEMPORAL_NS+"before"), TEMPORAL_InstantBeforeInstant);
- SEARCH_FUNCTION_MAP.put(new URIImpl(TEMPORAL_NS+"equals"), TEMPORAL_InstantEqualsInstant);
-
- SEARCH_FUNCTION_MAP.put(new URIImpl(TEMPORAL_NS+"beforeInterval"), TEMPORAL_InstantBeforeInterval);
- SEARCH_FUNCTION_MAP.put(new URIImpl(TEMPORAL_NS+"afterInterval"), TEMPORAL_InstantAfterInterval);
- SEARCH_FUNCTION_MAP.put(new URIImpl(TEMPORAL_NS+"insideInterval"), TEMPORAL_InstantInsideInterval);
- SEARCH_FUNCTION_MAP.put(new URIImpl(TEMPORAL_NS+"hasBeginningInterval"),
- TEMPORAL_InstantHasBeginningInterval);
- SEARCH_FUNCTION_MAP.put(new URIImpl(TEMPORAL_NS+"hasEndInterval"), TEMPORAL_InstantHasEndInterval);
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/ConfigUtils.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/ConfigUtils.java b/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/ConfigUtils.java
deleted file mode 100644
index 7c608de..0000000
--- a/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/ConfigUtils.java
+++ /dev/null
@@ -1,418 +0,0 @@
-package mvm.rya.indexing.accumulo;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import static java.util.Objects.requireNonNull;
-
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
-import org.apache.accumulo.core.client.AccumuloException;
-import org.apache.accumulo.core.client.AccumuloSecurityException;
-import org.apache.accumulo.core.client.BatchScanner;
-import org.apache.accumulo.core.client.BatchWriter;
-import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.Instance;
-import org.apache.accumulo.core.client.MultiTableBatchWriter;
-import org.apache.accumulo.core.client.Scanner;
-import org.apache.accumulo.core.client.TableExistsException;
-import org.apache.accumulo.core.client.TableNotFoundException;
-import org.apache.accumulo.core.client.ZooKeeperInstance;
-import org.apache.accumulo.core.client.admin.TableOperations;
-import org.apache.accumulo.core.client.mock.MockInstance;
-import org.apache.accumulo.core.security.Authorizations;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.log4j.Logger;
-import org.openrdf.model.URI;
-import org.openrdf.model.impl.URIImpl;
-
-import com.google.common.base.Optional;
-import com.google.common.collect.Lists;
-
-import mvm.rya.accumulo.AccumuloRdfConfiguration;
-import mvm.rya.api.RdfCloudTripleStoreConfiguration;
-import mvm.rya.api.instance.RyaDetails;
-import mvm.rya.indexing.FilterFunctionOptimizer;
-import mvm.rya.indexing.accumulo.entity.EntityCentricIndex;
-import mvm.rya.indexing.accumulo.entity.EntityOptimizer;
-import mvm.rya.indexing.accumulo.freetext.AccumuloFreeTextIndexer;
-import mvm.rya.indexing.accumulo.freetext.LuceneTokenizer;
-import mvm.rya.indexing.accumulo.freetext.Tokenizer;
-import mvm.rya.indexing.accumulo.temporal.AccumuloTemporalIndexer;
-import mvm.rya.indexing.external.PrecomputedJoinIndexer;
-import mvm.rya.indexing.mongodb.freetext.MongoFreeTextIndexer;
-import mvm.rya.indexing.pcj.matching.PCJOptimizer;
-
-/**
- * A set of configuration utils to read a Hadoop {@link Configuration} object and create Cloudbase/Accumulo objects.
- * Soon will deprecate this class. Use installer for the set methods, use {@link RyaDetails} for the get methods.
- * New code must separate parameters that are set at Rya install time from that which is specific to the client.
- * Also Accumulo index tables are pushed down to the implementation and not configured in conf.
- */
-public class ConfigUtils {
- private static final Logger logger = Logger.getLogger(ConfigUtils.class);
-
- public static final String CLOUDBASE_TBL_PREFIX = "sc.cloudbase.tableprefix";
- public static final String CLOUDBASE_AUTHS = "sc.cloudbase.authorizations";
- public static final String CLOUDBASE_INSTANCE = "sc.cloudbase.instancename";
- public static final String CLOUDBASE_ZOOKEEPERS = "sc.cloudbase.zookeepers";
- public static final String CLOUDBASE_USER = "sc.cloudbase.username";
- public static final String CLOUDBASE_PASSWORD = "sc.cloudbase.password";
-
- public static final String CLOUDBASE_WRITER_MAX_WRITE_THREADS = "sc.cloudbase.writer.maxwritethreads";
- public static final String CLOUDBASE_WRITER_MAX_LATENCY = "sc.cloudbase.writer.maxlatency";
- public static final String CLOUDBASE_WRITER_MAX_MEMORY = "sc.cloudbase.writer.maxmemory";
-
- public static final String FREE_TEXT_QUERY_TERM_LIMIT = "sc.freetext.querytermlimit";
-
- public static final String USE_FREETEXT = "sc.use_freetext";
- public static final String USE_TEMPORAL = "sc.use_temporal";
- public static final String USE_ENTITY = "sc.use_entity";
- public static final String USE_PCJ = "sc.use_pcj";
- public static final String USE_OPTIMAL_PCJ = "sc.use.optimal.pcj";
- public static final String USE_PCJ_UPDATER_INDEX = "sc.use.updater";
-
- public static final String FLUO_APP_NAME = "rya.indexing.pcj.fluo.fluoAppName";
- public static final String USE_PCJ_FLUO_UPDATER = "rya.indexing.pcj.updater.fluo";
- public static final String PCJ_STORAGE_TYPE = "rya.indexing.pcj.storageType";
- public static final String PCJ_UPDATER_TYPE = "rya.indexing.pcj.updaterType";
-
-
- public static final String USE_INDEXING_SAIL = "sc.use.indexing.sail";
- public static final String USE_EXTERNAL_SAIL = "sc.use.external.sail";
-
- public static final String USE_MOCK_INSTANCE = ".useMockInstance";
-
- public static final String NUM_PARTITIONS = "sc.cloudbase.numPartitions";
-
- private static final int WRITER_MAX_WRITE_THREADS = 1;
- private static final long WRITER_MAX_LATNECY = Long.MAX_VALUE;
- private static final long WRITER_MAX_MEMORY = 10000L;
-
- public static final String DISPLAY_QUERY_PLAN = "query.printqueryplan";
-
- public static final String FREETEXT_PREDICATES_LIST = "sc.freetext.predicates";
- public static final String FREETEXT_DOC_NUM_PARTITIONS = "sc.freetext.numPartitions.text";
- public static final String FREETEXT_TERM_NUM_PARTITIONS = "sc.freetext.numPartitions.term";
-
- public static final String TOKENIZER_CLASS = "sc.freetext.tokenizer.class";
-
- public static final String GEO_PREDICATES_LIST = "sc.geo.predicates";
-
- public static final String TEMPORAL_PREDICATES_LIST = "sc.temporal.predicates";
-
- public static final String USE_MONGO = "sc.useMongo";
-
- public static boolean isDisplayQueryPlan(final Configuration conf){
- return conf.getBoolean(DISPLAY_QUERY_PLAN, false);
- }
-
- /**
- * get a value from the configuration file and throw an exception if the value does not exist.
- *
- * @param conf
- * @param key
- * @return
- */
- private static String getStringCheckSet(final Configuration conf, final String key) {
- final String value = conf.get(key);
- requireNonNull(value, key + " not set");
- return value;
- }
-
- /**
- * @param conf
- * @param tablename
- * @return if the table was created
- * @throws AccumuloException
- * @throws AccumuloSecurityException
- * @throws TableExistsException
- */
- public static boolean createTableIfNotExists(final Configuration conf, final String tablename) throws AccumuloException, AccumuloSecurityException,
- TableExistsException {
- final TableOperations tops = getConnector(conf).tableOperations();
- if (!tops.exists(tablename)) {
- logger.info("Creating table: " + tablename);
- tops.create(tablename);
- return true;
- }
- return false;
- }
-
- /**
- * Lookup the table name prefix in the conf and throw an error if it is null.
- * Future, get table prefix from RyaDetails -- the Rya instance name
- * -- also getting info from the RyaDetails should happen within RyaSailFactory and not ConfigUtils.
- * @param conf Rya configuration map where it extracts the prefix (instance name)
- * @return index table prefix corresponding to this Rya instance
- */
- public static String getTablePrefix(final Configuration conf) {
- final String tablePrefix;
- tablePrefix = conf.get(RdfCloudTripleStoreConfiguration.CONF_TBL_PREFIX);
- requireNonNull(tablePrefix, "Configuration key: " + RdfCloudTripleStoreConfiguration.CONF_TBL_PREFIX
- + " not set. Cannot generate table name.");
- return tablePrefix;
- }
-
- public static int getFreeTextTermLimit(final Configuration conf) {
- return conf.getInt(FREE_TEXT_QUERY_TERM_LIMIT, 100);
- }
-
- public static Set<URI> getFreeTextPredicates(final Configuration conf) {
- return getPredicates(conf, FREETEXT_PREDICATES_LIST);
- }
-
- public static Set<URI> getGeoPredicates(final Configuration conf) {
- return getPredicates(conf, GEO_PREDICATES_LIST);
- }
- /**
- * Used for indexing statements about date & time instances and intervals.
- * @param conf
- * @return Set of predicate URI's whose objects should be date time literals.
- */
- public static Set<URI> getTemporalPredicates(final Configuration conf) {
- return getPredicates(conf, TEMPORAL_PREDICATES_LIST);
- }
-
- protected static Set<URI> getPredicates(final Configuration conf, final String confName) {
- final String[] validPredicateStrings = conf.getStrings(confName, new String[] {});
- final Set<URI> predicates = new HashSet<URI>();
- for (final String prediateString : validPredicateStrings) {
- predicates.add(new URIImpl(prediateString));
- }
- return predicates;
- }
-
- public static Tokenizer getFreeTextTokenizer(final Configuration conf) {
- final Class<? extends Tokenizer> c = conf.getClass(TOKENIZER_CLASS, LuceneTokenizer.class, Tokenizer.class);
- return ReflectionUtils.newInstance(c, conf);
- }
-
- public static BatchWriter createDefaultBatchWriter(final String tablename, final Configuration conf) throws TableNotFoundException,
- AccumuloException, AccumuloSecurityException {
- final Long DEFAULT_MAX_MEMORY = getWriterMaxMemory(conf);
- final Long DEFAULT_MAX_LATENCY = getWriterMaxLatency(conf);
- final Integer DEFAULT_MAX_WRITE_THREADS = getWriterMaxWriteThreads(conf);
- final Connector connector = ConfigUtils.getConnector(conf);
- return connector.createBatchWriter(tablename, DEFAULT_MAX_MEMORY, DEFAULT_MAX_LATENCY, DEFAULT_MAX_WRITE_THREADS);
- }
-
- public static MultiTableBatchWriter createMultitableBatchWriter(final Configuration conf) throws AccumuloException, AccumuloSecurityException {
- final Long DEFAULT_MAX_MEMORY = getWriterMaxMemory(conf);
- final Long DEFAULT_MAX_LATENCY = getWriterMaxLatency(conf);
- final Integer DEFAULT_MAX_WRITE_THREADS = getWriterMaxWriteThreads(conf);
- final Connector connector = ConfigUtils.getConnector(conf);
- return connector.createMultiTableBatchWriter(DEFAULT_MAX_MEMORY, DEFAULT_MAX_LATENCY, DEFAULT_MAX_WRITE_THREADS);
- }
-
- public static Scanner createScanner(final String tablename, final Configuration conf) throws AccumuloException, AccumuloSecurityException,
- TableNotFoundException {
- final Connector connector = ConfigUtils.getConnector(conf);
- final Authorizations auths = ConfigUtils.getAuthorizations(conf);
- return connector.createScanner(tablename, auths);
-
- }
-
- public static BatchScanner createBatchScanner(final String tablename, final Configuration conf) throws AccumuloException, AccumuloSecurityException,
- TableNotFoundException {
- final Connector connector = ConfigUtils.getConnector(conf);
- final Authorizations auths = ConfigUtils.getAuthorizations(conf);
- Integer numThreads = null;
- if (conf instanceof RdfCloudTripleStoreConfiguration) {
- numThreads = ((RdfCloudTripleStoreConfiguration) conf).getNumThreads();
- } else {
- numThreads = conf.getInt(RdfCloudTripleStoreConfiguration.CONF_NUM_THREADS, 2);
- }
- return connector.createBatchScanner(tablename, auths, numThreads);
- }
-
- public static int getWriterMaxWriteThreads(final Configuration conf) {
- return conf.getInt(CLOUDBASE_WRITER_MAX_WRITE_THREADS, WRITER_MAX_WRITE_THREADS);
- }
-
- public static long getWriterMaxLatency(final Configuration conf) {
- return conf.getLong(CLOUDBASE_WRITER_MAX_LATENCY, WRITER_MAX_LATNECY);
- }
-
- public static long getWriterMaxMemory(final Configuration conf) {
- return conf.getLong(CLOUDBASE_WRITER_MAX_MEMORY, WRITER_MAX_MEMORY);
- }
-
- public static String getUsername(final JobContext job) {
- return getUsername(job.getConfiguration());
- }
-
- public static String getUsername(final Configuration conf) {
- return conf.get(CLOUDBASE_USER);
- }
-
- public static Authorizations getAuthorizations(final JobContext job) {
- return getAuthorizations(job.getConfiguration());
- }
-
- public static Authorizations getAuthorizations(final Configuration conf) {
- final String authString = conf.get(CLOUDBASE_AUTHS, "");
- if (authString.isEmpty()) {
- return new Authorizations();
- }
- return new Authorizations(authString.split(","));
- }
-
- public static Instance getInstance(final JobContext job) {
- return getInstance(job.getConfiguration());
- }
-
- public static Instance getInstance(final Configuration conf) {
- if (useMockInstance(conf)) {
- return new MockInstance(conf.get(CLOUDBASE_INSTANCE));
- }
- return new ZooKeeperInstance(conf.get(CLOUDBASE_INSTANCE), conf.get(CLOUDBASE_ZOOKEEPERS));
- }
-
- public static String getPassword(final JobContext job) {
- return getPassword(job.getConfiguration());
- }
-
- public static String getPassword(final Configuration conf) {
- return conf.get(CLOUDBASE_PASSWORD, "");
- }
-
- public static Connector getConnector(final JobContext job) throws AccumuloException, AccumuloSecurityException {
- return getConnector(job.getConfiguration());
- }
-
- public static Connector getConnector(final Configuration conf) throws AccumuloException, AccumuloSecurityException {
- final Instance instance = ConfigUtils.getInstance(conf);
-
- return instance.getConnector(getUsername(conf), getPassword(conf));
- }
-
- public static boolean useMockInstance(final Configuration conf) {
- return conf.getBoolean(USE_MOCK_INSTANCE, false);
- }
-
- protected static int getNumPartitions(final Configuration conf) {
- return conf.getInt(NUM_PARTITIONS, 25);
- }
-
- public static int getFreeTextDocNumPartitions(final Configuration conf) {
- return conf.getInt(FREETEXT_DOC_NUM_PARTITIONS, getNumPartitions(conf));
- }
-
- public static int getFreeTextTermNumPartitions(final Configuration conf) {
- return conf.getInt(FREETEXT_TERM_NUM_PARTITIONS, getNumPartitions(conf));
- }
-
- public static boolean getUseFreeText(final Configuration conf) {
- return conf.getBoolean(USE_FREETEXT, false);
- }
-
- public static boolean getUseTemporal(final Configuration conf) {
- return conf.getBoolean(USE_TEMPORAL, false);
- }
-
- public static boolean getUseEntity(final Configuration conf) {
- return conf.getBoolean(USE_ENTITY, false);
- }
-
- public static boolean getUsePCJ(final Configuration conf) {
- return conf.getBoolean(USE_PCJ, false);
- }
-
- public static boolean getUseOptimalPCJ(final Configuration conf) {
- return conf.getBoolean(USE_OPTIMAL_PCJ, false);
- }
-
- public static boolean getUsePcjUpdaterIndex(final Configuration conf) {
- return conf.getBoolean(USE_PCJ_UPDATER_INDEX, false);
- }
-
-
- /**
- * @return The name of the Fluo Application this instance of RYA is
- * using to incrementally update PCJs.
- */
- //TODO delete this eventually and use Details table
- public Optional<String> getFluoAppName(Configuration conf) {
- return Optional.fromNullable(conf.get(FLUO_APP_NAME));
- }
-
-
- public static boolean getUseMongo(final Configuration conf) {
- return conf.getBoolean(USE_MONGO, false);
- }
-
-
- public static void setIndexers(final RdfCloudTripleStoreConfiguration conf) {
-
- final List<String> indexList = Lists.newArrayList();
- final List<String> optimizers = Lists.newArrayList();
-
- boolean useFilterIndex = false;
-
- if (ConfigUtils.getUseMongo(conf)) {
- if (getUseFreeText(conf)) {
- indexList.add(MongoFreeTextIndexer.class.getName());
- useFilterIndex = true;
- }
- } else {
-
- if (getUsePCJ(conf) || getUseOptimalPCJ(conf)) {
- conf.setPcjOptimizer(PCJOptimizer.class);
- }
-
- if(getUsePcjUpdaterIndex(conf)) {
- indexList.add(PrecomputedJoinIndexer.class.getName());
- }
-
-
- if (getUseFreeText(conf)) {
- indexList.add(AccumuloFreeTextIndexer.class.getName());
- useFilterIndex = true;
- }
-
- if (getUseTemporal(conf)) {
- indexList.add(AccumuloTemporalIndexer.class.getName());
- useFilterIndex = true;
- }
-
- }
-
- if (useFilterIndex) {
- optimizers.add(FilterFunctionOptimizer.class.getName());
- }
-
- if (getUseEntity(conf)) {
- indexList.add(EntityCentricIndex.class.getName());
- optimizers.add(EntityOptimizer.class.getName());
-
- }
-
- conf.setStrings(AccumuloRdfConfiguration.CONF_ADDITIONAL_INDEXERS, indexList.toArray(new String[]{}));
- conf.setStrings(RdfCloudTripleStoreConfiguration.CONF_OPTIMIZERS, optimizers.toArray(new String[]{}));
-
- }
-
-
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/entity/AccumuloDocIdIndexer.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/entity/AccumuloDocIdIndexer.java b/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/entity/AccumuloDocIdIndexer.java
deleted file mode 100644
index 4c1a3ad..0000000
--- a/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/entity/AccumuloDocIdIndexer.java
+++ /dev/null
@@ -1,443 +0,0 @@
-package mvm.rya.indexing.accumulo.entity;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-
-import static mvm.rya.api.RdfCloudTripleStoreConstants.DELIM_BYTE;
-import static mvm.rya.api.RdfCloudTripleStoreConstants.TYPE_DELIM_BYTE;
-import info.aduna.iteration.CloseableIteration;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map.Entry;
-import java.util.NoSuchElementException;
-import java.util.Set;
-
-import mvm.rya.accumulo.AccumuloRdfConfiguration;
-import mvm.rya.accumulo.documentIndex.DocIndexIteratorUtil;
-import mvm.rya.accumulo.documentIndex.DocumentIndexIntersectingIterator;
-import mvm.rya.api.RdfCloudTripleStoreConfiguration;
-import mvm.rya.api.domain.RyaURI;
-import mvm.rya.api.resolver.RyaContext;
-import mvm.rya.api.resolver.RyaToRdfConversions;
-import mvm.rya.api.resolver.RyaTypeResolverException;
-import mvm.rya.indexing.DocIdIndexer;
-import mvm.rya.indexing.accumulo.ConfigUtils;
-
-import org.apache.accumulo.core.client.AccumuloException;
-import org.apache.accumulo.core.client.AccumuloSecurityException;
-import org.apache.accumulo.core.client.BatchScanner;
-import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.IteratorSetting;
-import org.apache.accumulo.core.client.TableNotFoundException;
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.Range;
-import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.security.Authorizations;
-import org.apache.hadoop.io.Text;
-import org.openrdf.query.BindingSet;
-import org.openrdf.query.MalformedQueryException;
-import org.openrdf.query.QueryEvaluationException;
-import org.openrdf.query.algebra.StatementPattern;
-import org.openrdf.query.algebra.TupleExpr;
-import org.openrdf.query.algebra.evaluation.QueryBindingSet;
-import org.openrdf.query.algebra.helpers.StatementPatternCollector;
-import org.openrdf.query.parser.ParsedQuery;
-import org.openrdf.query.parser.sparql.SPARQLParser;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.Sets;
-import com.google.common.primitives.Bytes;
-
-public class AccumuloDocIdIndexer implements DocIdIndexer {
-
-
-
- private BatchScanner bs;
- private AccumuloRdfConfiguration conf;
-
- public AccumuloDocIdIndexer(RdfCloudTripleStoreConfiguration conf) throws AccumuloException, AccumuloSecurityException {
- Preconditions.checkArgument(conf instanceof RdfCloudTripleStoreConfiguration, "conf must be isntance of RdfCloudTripleStoreConfiguration");
- this.conf = (AccumuloRdfConfiguration) conf;
- //Connector conn = ConfigUtils.getConnector(conf);
- }
-
-
-
-
- public CloseableIteration<BindingSet, QueryEvaluationException> queryDocIndex(String sparqlQuery,
- Collection<BindingSet> constraints) throws TableNotFoundException, QueryEvaluationException {
-
- SPARQLParser parser = new SPARQLParser();
- ParsedQuery pq1 = null;
- try {
- pq1 = parser.parseQuery(sparqlQuery, null);
- } catch (MalformedQueryException e) {
- e.printStackTrace();
- }
-
- TupleExpr te1 = pq1.getTupleExpr();
- List<StatementPattern> spList1 = StatementPatternCollector.process(te1);
-
- if(StarQuery.isValidStarQuery(spList1)) {
- StarQuery sq1 = new StarQuery(spList1);
- return queryDocIndex(sq1, constraints);
- } else {
- throw new IllegalArgumentException("Invalid star query!");
- }
-
- }
-
-
-
-
- @Override
- public CloseableIteration<BindingSet, QueryEvaluationException> queryDocIndex(StarQuery query,
- Collection<BindingSet> constraints) throws TableNotFoundException, QueryEvaluationException {
-
- final StarQuery starQ = query;
- final Iterator<BindingSet> bs = constraints.iterator();
- final Iterator<BindingSet> bs2 = constraints.iterator();
- final Set<String> unCommonVarNames;
- final Set<String> commonVarNames;
- if (bs2.hasNext()) {
- BindingSet currBs = bs2.next();
- commonVarNames = StarQuery.getCommonVars(query, currBs);
- unCommonVarNames = Sets.difference(currBs.getBindingNames(), commonVarNames);
- } else {
- commonVarNames = Sets.newHashSet();
- unCommonVarNames = Sets.newHashSet();
- }
-
- if( commonVarNames.size() == 1 && !query.commonVarConstant() && commonVarNames.contains(query.getCommonVarName())) {
-
- final HashMultimap<String, BindingSet> map = HashMultimap.create();
- final String commonVar = starQ.getCommonVarName();
- final Iterator<Entry<Key, Value>> intersections;
- final BatchScanner scan;
- Set<Range> ranges = Sets.newHashSet();
-
- while(bs.hasNext()) {
-
- BindingSet currentBs = bs.next();
-
- if(currentBs.getBinding(commonVar) == null) {
- continue;
- }
-
- String row = currentBs.getBinding(commonVar).getValue().stringValue();
- ranges.add(new Range(row));
- map.put(row, currentBs);
-
- }
- scan = runQuery(starQ, ranges);
- intersections = scan.iterator();
-
-
- return new CloseableIteration<BindingSet, QueryEvaluationException>() {
-
-
- private QueryBindingSet currentSolutionBs = null;
- private boolean hasNextCalled = false;
- private boolean isEmpty = false;
- private Iterator<BindingSet> inputSet = new ArrayList<BindingSet>().iterator();
- private BindingSet currentBs;
- private Key key;
-
-
-
- @Override
- public boolean hasNext() throws QueryEvaluationException {
- if (!hasNextCalled && !isEmpty) {
- while (inputSet.hasNext() || intersections.hasNext()) {
- if (!inputSet.hasNext()) {
- key = intersections.next().getKey();
- inputSet = map.get(key.getRow().toString()).iterator();
- }
- currentBs = inputSet.next();
- currentSolutionBs = deserializeKey(key, starQ, currentBs, unCommonVarNames);
-
- if (currentSolutionBs.size() == unCommonVarNames.size() + starQ.getUnCommonVars().size() +1) {
- hasNextCalled = true;
- return true;
- }
-
- }
-
- isEmpty = true;
- return false;
-
- } else if (isEmpty) {
- return false;
- } else {
- return true;
- }
-
- }
-
-
- @Override
- public BindingSet next() throws QueryEvaluationException {
-
- if (hasNextCalled) {
- hasNextCalled = false;
- } else if (isEmpty) {
- throw new NoSuchElementException();
- } else {
- if (this.hasNext()) {
- hasNextCalled = false;
- } else {
- throw new NoSuchElementException();
- }
- }
-
- return currentSolutionBs;
- }
-
- @Override
- public void remove() throws QueryEvaluationException {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void close() throws QueryEvaluationException {
- scan.close();
- }
-
- };
-
-
- } else {
-
- return new CloseableIteration<BindingSet, QueryEvaluationException>() {
-
- @Override
- public void remove() throws QueryEvaluationException {
- throw new UnsupportedOperationException();
- }
-
- private Iterator<Entry<Key, Value>> intersections = null;
- private QueryBindingSet currentSolutionBs = null;
- private boolean hasNextCalled = false;
- private boolean isEmpty = false;
- private boolean init = false;
- private BindingSet currentBs;
- private StarQuery sq = new StarQuery(starQ);
- private Set<Range> emptyRangeSet = Sets.newHashSet();
- private BatchScanner scan;
-
- @Override
- public BindingSet next() throws QueryEvaluationException {
- if (hasNextCalled) {
- hasNextCalled = false;
- } else if (isEmpty) {
- throw new NoSuchElementException();
- } else {
- if (this.hasNext()) {
- hasNextCalled = false;
- } else {
- throw new NoSuchElementException();
- }
- }
- return currentSolutionBs;
- }
-
- @Override
- public boolean hasNext() throws QueryEvaluationException {
-
- if (!init) {
- if (intersections == null && bs.hasNext()) {
- currentBs = bs.next();
- sq = StarQuery.getConstrainedStarQuery(sq, currentBs);
- scan = runQuery(sq,emptyRangeSet);
- intersections = scan.iterator();
- // binding set empty
- } else if (intersections == null && !bs.hasNext()) {
- currentBs = new QueryBindingSet();
- scan = runQuery(starQ,emptyRangeSet);
- intersections = scan.iterator();
- }
-
- init = true;
- }
-
- if (!hasNextCalled && !isEmpty) {
- while (intersections.hasNext() || bs.hasNext()) {
- if (!intersections.hasNext()) {
- scan.close();
- currentBs = bs.next();
- sq = StarQuery.getConstrainedStarQuery(sq, currentBs);
- scan = runQuery(sq,emptyRangeSet);
- intersections = scan.iterator();
- }
- if (intersections.hasNext()) {
- currentSolutionBs = deserializeKey(intersections.next().getKey(), sq, currentBs,
- unCommonVarNames);
- } else {
- continue;
- }
-
- if (sq.commonVarConstant() && currentSolutionBs.size() == unCommonVarNames.size() + sq.getUnCommonVars().size()) {
- hasNextCalled = true;
- return true;
- } else if(currentSolutionBs.size() == unCommonVarNames.size() + sq.getUnCommonVars().size() + 1) {
- hasNextCalled = true;
- return true;
- }
- }
-
- isEmpty = true;
- return false;
-
- } else if (isEmpty) {
- return false;
- } else {
- return true;
- }
- }
-
- @Override
- public void close() throws QueryEvaluationException {
- scan.close();
- }
- };
- }
- }
-
- private QueryBindingSet deserializeKey(Key key, StarQuery sq, BindingSet currentBs, Set<String> unCommonVar) {
-
-
- QueryBindingSet currentSolutionBs = new QueryBindingSet();
-
- Text row = key.getRow();
- Text cq = key.getColumnQualifier();
-
-
- String[] cqArray = cq.toString().split(DocIndexIteratorUtil.DOC_ID_INDEX_DELIM);
-
- boolean commonVarSet = false;
-
- //if common Var is constant there is no common variable to assign a value to
- if(sq.commonVarConstant()) {
- commonVarSet = true;
- }
-
- if (!commonVarSet && sq.isCommonVarURI()) {
- RyaURI rURI = new RyaURI(row.toString());
- currentSolutionBs.addBinding(sq.getCommonVarName(),
- RyaToRdfConversions.convertValue(rURI));
- commonVarSet = true;
- }
-
- for (String s : sq.getUnCommonVars()) {
-
- byte[] cqBytes = cqArray[sq.getVarPos().get(s)].getBytes();
- int firstIndex = Bytes.indexOf(cqBytes, DELIM_BYTE);
- int secondIndex = Bytes.lastIndexOf(cqBytes, DELIM_BYTE);
- int typeIndex = Bytes.indexOf(cqBytes, TYPE_DELIM_BYTE);
- byte[] tripleComponent = Arrays.copyOfRange(cqBytes, firstIndex + 1, secondIndex);
- byte[] cqContent = Arrays.copyOfRange(cqBytes, secondIndex + 1, typeIndex);
- byte[] objType = Arrays.copyOfRange(cqBytes, typeIndex, cqBytes.length);
-
- if (new String(tripleComponent).equals("object")) {
- byte[] object = Bytes.concat(cqContent, objType);
- org.openrdf.model.Value v = null;
- try {
- v = RyaToRdfConversions.convertValue(RyaContext.getInstance().deserialize(
- object));
- } catch (RyaTypeResolverException e) {
- e.printStackTrace();
- }
- currentSolutionBs.addBinding(s, v);
-
- } else if (new String(tripleComponent).equals("subject")) {
- if (!commonVarSet) {
- byte[] object = Bytes.concat(row.getBytes(), objType);
- org.openrdf.model.Value v = null;
- try {
- v = RyaToRdfConversions.convertValue(RyaContext.getInstance().deserialize(
- object));
- } catch (RyaTypeResolverException e) {
- e.printStackTrace();
- }
- currentSolutionBs.addBinding(sq.getCommonVarName(), v);
- commonVarSet = true;
- }
- RyaURI rURI = new RyaURI(new String(cqContent));
- currentSolutionBs.addBinding(s, RyaToRdfConversions.convertValue(rURI));
- } else {
- throw new IllegalArgumentException("Invalid row.");
- }
- }
- for (String s : unCommonVar) {
- currentSolutionBs.addBinding(s, currentBs.getValue(s));
- }
- return currentSolutionBs;
- }
-
- private BatchScanner runQuery(StarQuery query, Collection<Range> ranges) throws QueryEvaluationException {
-
- try {
- if (ranges.size() == 0) {
- String rangeText = query.getCommonVarValue();
- Range r;
- if (rangeText != null) {
- r = new Range(new Text(query.getCommonVarValue()));
- } else {
- r = new Range();
- }
- ranges = Collections.singleton(r);
- }
-
- Connector accCon = ConfigUtils.getConnector(conf);
- IteratorSetting is = new IteratorSetting(30, "fii", DocumentIndexIntersectingIterator.class);
-
- DocumentIndexIntersectingIterator.setColumnFamilies(is, query.getColumnCond());
-
- if (query.hasContext()) {
- DocumentIndexIntersectingIterator.setContext(is, query.getContextURI());
- }
- bs = accCon.createBatchScanner(EntityCentricIndex.getTableName(conf),
- new Authorizations(conf.get(ConfigUtils.CLOUDBASE_AUTHS)), 15);
- bs.addScanIterator(is);
- bs.setRanges(ranges);
-
- return bs;
-
- } catch (TableNotFoundException | AccumuloException | AccumuloSecurityException e) {
- throw new QueryEvaluationException(e);
- }
- }
-
- @Override
- public void close() throws IOException {
- //TODO generate an exception when BS passed in -- scanner closed
-// if (bs != null) {
-// bs.close();
-// }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/entity/EntityCentricIndex.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/entity/EntityCentricIndex.java b/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/entity/EntityCentricIndex.java
deleted file mode 100644
index 9a9daa5..0000000
--- a/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/entity/EntityCentricIndex.java
+++ /dev/null
@@ -1,327 +0,0 @@
-package mvm.rya.indexing.accumulo.entity;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-
-import static mvm.rya.accumulo.AccumuloRdfConstants.EMPTY_CV;
-import static mvm.rya.accumulo.AccumuloRdfConstants.EMPTY_VALUE;
-import static mvm.rya.api.RdfCloudTripleStoreConstants.DELIM_BYTES;
-import static mvm.rya.api.RdfCloudTripleStoreConstants.EMPTY_BYTES;
-import static mvm.rya.api.RdfCloudTripleStoreConstants.EMPTY_TEXT;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.List;
-import java.util.Set;
-
-import org.apache.accumulo.core.client.AccumuloException;
-import org.apache.accumulo.core.client.AccumuloSecurityException;
-import org.apache.accumulo.core.client.BatchWriter;
-import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.MultiTableBatchWriter;
-import org.apache.accumulo.core.client.MutationsRejectedException;
-import org.apache.accumulo.core.client.TableExistsException;
-import org.apache.accumulo.core.client.TableNotFoundException;
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.Mutation;
-import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.security.ColumnVisibility;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.Text;
-import org.apache.log4j.Logger;
-import org.openrdf.model.URI;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.common.primitives.Bytes;
-
-import mvm.rya.accumulo.AccumuloRdfConfiguration;
-import mvm.rya.accumulo.experimental.AbstractAccumuloIndexer;
-import mvm.rya.api.RdfCloudTripleStoreConfiguration;
-import mvm.rya.api.domain.RyaStatement;
-import mvm.rya.api.domain.RyaType;
-import mvm.rya.api.domain.RyaURI;
-import mvm.rya.api.resolver.RyaContext;
-import mvm.rya.api.resolver.RyaTypeResolverException;
-import mvm.rya.api.resolver.triple.TripleRow;
-import mvm.rya.indexing.accumulo.ConfigUtils;
-
-public class EntityCentricIndex extends AbstractAccumuloIndexer {
-
- private static final Logger logger = Logger.getLogger(EntityCentricIndex.class);
- private static final String TABLE_SUFFIX = "EntityCentricIndex";
-
- private AccumuloRdfConfiguration conf;
- private BatchWriter writer;
- private boolean isInit = false;
-
- private void initInternal() throws AccumuloException, AccumuloSecurityException, TableNotFoundException, IOException,
- TableExistsException {
- ConfigUtils.createTableIfNotExists(conf, getTableName());
- }
-
- @Override
- public Configuration getConf() {
- return conf;
- }
-
- //initialization occurs in setConf because index is created using reflection
- @Override
- public void setConf(final Configuration conf) {
- if (conf instanceof AccumuloRdfConfiguration) {
- this.conf = (AccumuloRdfConfiguration) conf;
- } else {
- this.conf = new AccumuloRdfConfiguration(conf);
- }
- if (!isInit) {
- try {
- initInternal();
- isInit = true;
- } catch (final AccumuloException e) {
- logger.warn("Unable to initialize index. Throwing Runtime Exception. ", e);
- throw new RuntimeException(e);
- } catch (final AccumuloSecurityException e) {
- logger.warn("Unable to initialize index. Throwing Runtime Exception. ", e);
- throw new RuntimeException(e);
- } catch (final TableNotFoundException e) {
- logger.warn("Unable to initialize index. Throwing Runtime Exception. ", e);
- throw new RuntimeException(e);
- } catch (final TableExistsException e) {
- logger.warn("Unable to initialize index. Throwing Runtime Exception. ", e);
- throw new RuntimeException(e);
- } catch (final IOException e) {
- logger.warn("Unable to initialize index. Throwing Runtime Exception. ", e);
- throw new RuntimeException(e);
- }
- }
- }
-
- /**
- * Get the Accumulo table used by this index.
- * @return table used by instances of this index
- */
- @Override
- public String getTableName() {
- return getTableName(conf);
- }
-
- /**
- * Get the Accumulo table that will be used by this index.
- * @param conf
- * @return table name guaranteed to be used by instances of this index
- */
- public static String getTableName(Configuration conf) {
- return ConfigUtils.getTablePrefix(conf) + TABLE_SUFFIX;
- }
-
- @Override
- public void setMultiTableBatchWriter(final MultiTableBatchWriter writer) throws IOException {
- try {
- this.writer = writer.getBatchWriter(getTableName());
- } catch (final AccumuloException e) {
- throw new IOException(e);
- } catch (final AccumuloSecurityException e) {
- throw new IOException(e);
- } catch (final TableNotFoundException e) {
- throw new IOException(e);
- }
- }
-
- @Override
- public void storeStatement(final RyaStatement stmt) throws IOException {
- Preconditions.checkNotNull(writer, "BatchWriter not Set");
- try {
- for (final TripleRow row : serializeStatement(stmt)) {
- writer.addMutation(createMutation(row));
- }
- } catch (final MutationsRejectedException e) {
- throw new IOException(e);
- } catch (final RyaTypeResolverException e) {
- throw new IOException(e);
- }
- }
-
- @Override
- public void deleteStatement(final RyaStatement stmt) throws IOException {
- Preconditions.checkNotNull(writer, "BatchWriter not Set");
- try {
- for (final TripleRow row : serializeStatement(stmt)) {
- writer.addMutation(deleteMutation(row));
- }
- } catch (final MutationsRejectedException e) {
- throw new IOException(e);
- } catch (final RyaTypeResolverException e) {
- throw new IOException(e);
- }
- }
-
- protected Mutation deleteMutation(final TripleRow tripleRow) {
- final Mutation m = new Mutation(new Text(tripleRow.getRow()));
-
- final byte[] columnFamily = tripleRow.getColumnFamily();
- final Text cfText = columnFamily == null ? EMPTY_TEXT : new Text(columnFamily);
-
- final byte[] columnQualifier = tripleRow.getColumnQualifier();
- final Text cqText = columnQualifier == null ? EMPTY_TEXT : new Text(columnQualifier);
-
- final byte[] columnVisibility = tripleRow.getColumnVisibility();
- final ColumnVisibility cv = columnVisibility == null ? EMPTY_CV : new ColumnVisibility(columnVisibility);
-
- m.putDelete(cfText, cqText, cv, tripleRow.getTimestamp());
- return m;
- }
-
- public static Collection<Mutation> createMutations(final RyaStatement stmt) throws RyaTypeResolverException{
- final Collection<Mutation> m = Lists.newArrayList();
- for (final TripleRow tr : serializeStatement(stmt)){
- m.add(createMutation(tr));
- }
- return m;
- }
-
- private static Mutation createMutation(final TripleRow tripleRow) {
- final Mutation mutation = new Mutation(new Text(tripleRow.getRow()));
- final byte[] columnVisibility = tripleRow.getColumnVisibility();
- final ColumnVisibility cv = columnVisibility == null ? EMPTY_CV : new ColumnVisibility(columnVisibility);
- final Long timestamp = tripleRow.getTimestamp();
- final byte[] value = tripleRow.getValue();
- final Value v = value == null ? EMPTY_VALUE : new Value(value);
- final byte[] columnQualifier = tripleRow.getColumnQualifier();
- final Text cqText = columnQualifier == null ? EMPTY_TEXT : new Text(columnQualifier);
- final byte[] columnFamily = tripleRow.getColumnFamily();
- final Text cfText = columnFamily == null ? EMPTY_TEXT : new Text(columnFamily);
-
- mutation.put(cfText, cqText, cv, timestamp, v);
- return mutation;
- }
-
- private static List<TripleRow> serializeStatement(final RyaStatement stmt) throws RyaTypeResolverException {
- final RyaURI subject = stmt.getSubject();
- final RyaURI predicate = stmt.getPredicate();
- final RyaType object = stmt.getObject();
- final RyaURI context = stmt.getContext();
- final Long timestamp = stmt.getTimestamp();
- final byte[] columnVisibility = stmt.getColumnVisibility();
- final byte[] value = stmt.getValue();
- assert subject != null && predicate != null && object != null;
- final byte[] cf = (context == null) ? EMPTY_BYTES : context.getData().getBytes();
- final byte[] subjBytes = subject.getData().getBytes();
- final byte[] predBytes = predicate.getData().getBytes();
- final byte[][] objBytes = RyaContext.getInstance().serializeType(object);
-
- return Lists.newArrayList(new TripleRow(subjBytes,
- predBytes,
- Bytes.concat(cf, DELIM_BYTES,
- "object".getBytes(), DELIM_BYTES,
- objBytes[0], objBytes[1]),
- timestamp,
- columnVisibility,
- value),
- new TripleRow(objBytes[0],
- predBytes,
- Bytes.concat(cf, DELIM_BYTES,
- "subject".getBytes(), DELIM_BYTES,
- subjBytes, objBytes[1]),
- timestamp,
- columnVisibility,
- value));
- }
-
- /**
- * Deserialize a row from the entity-centric index.
- * @param key Row key, contains statement data
- * @param value Row value
- * @return The statement represented by the row
- * @throws IOException if edge direction can't be extracted as expected.
- * @throws RyaTypeResolverException if a type error occurs deserializing the statement's object.
- */
- public static RyaStatement deserializeStatement(Key key, Value value) throws RyaTypeResolverException, IOException {
- assert key != null;
- assert value != null;
- byte[] entityBytes = key.getRowData().toArray();
- byte[] predicateBytes = key.getColumnFamilyData().toArray();
- byte[] data = key.getColumnQualifierData().toArray();
- long timestamp = key.getTimestamp();
- byte[] columnVisibility = key.getColumnVisibilityData().toArray();
- byte[] valueBytes = value.get();
-
- // main entity is either the subject or object
- // data contains: column family , var name of other node , data of other node + datatype of object
- int split = Bytes.indexOf(data, DELIM_BYTES);
- byte[] columnFamily = Arrays.copyOf(data, split);
- byte[] edgeBytes = Arrays.copyOfRange(data, split + DELIM_BYTES.length, data.length);
- split = Bytes.indexOf(edgeBytes, DELIM_BYTES);
- String otherNodeVar = new String(Arrays.copyOf(edgeBytes, split));
- byte[] otherNodeBytes = Arrays.copyOfRange(edgeBytes, split + DELIM_BYTES.length, edgeBytes.length - 2);
- byte[] typeBytes = Arrays.copyOfRange(edgeBytes, edgeBytes.length - 2, edgeBytes.length);
- byte[] objectBytes;
- RyaURI subject;
- RyaURI predicate = new RyaURI(new String(predicateBytes));
- RyaType object;
- RyaURI context = null;
- // Expect either: entity=subject.data, otherNodeVar="object", otherNodeBytes={object.data, object.datatype_marker}
- // or: entity=object.data, otherNodeVar="subject", otherNodeBytes={subject.data, object.datatype_marker}
- switch (otherNodeVar) {
- case "subject":
- subject = new RyaURI(new String(otherNodeBytes));
- objectBytes = Bytes.concat(entityBytes, typeBytes);
- break;
- case "object":
- subject = new RyaURI(new String(entityBytes));
- objectBytes = Bytes.concat(otherNodeBytes, typeBytes);
- break;
- default:
- throw new IOException("Failed to deserialize entity-centric index row. "
- + "Expected 'subject' or 'object', encountered: '" + otherNodeVar + "'");
- }
- object = RyaContext.getInstance().deserialize(objectBytes);
- if (columnFamily != null && columnFamily.length > 0) {
- context = new RyaURI(new String(columnFamily));
- }
- return new RyaStatement(subject, predicate, object, context,
- null, columnVisibility, valueBytes, timestamp);
- }
-
- @Override
- public void init() {
- }
-
- @Override
- public void setConnector(final Connector connector) {
- }
-
- @Override
- public void destroy() {
- }
-
- @Override
- public void purge(final RdfCloudTripleStoreConfiguration configuration) {
- }
-
- @Override
- public void dropAndDestroy() {
- }
-
- @Override
- public Set<URI> getIndexablePredicates() {
- return null;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/entity/EntityLocalityGroupSetter.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/entity/EntityLocalityGroupSetter.java b/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/entity/EntityLocalityGroupSetter.java
deleted file mode 100644
index 2030e58..0000000
--- a/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/entity/EntityLocalityGroupSetter.java
+++ /dev/null
@@ -1,171 +0,0 @@
-package mvm.rya.indexing.accumulo.entity;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.Map.Entry;
-import java.util.NoSuchElementException;
-import java.util.Set;
-import mvm.rya.indexing.accumulo.ConfigUtils;
-import org.apache.accumulo.core.client.AccumuloException;
-import org.apache.accumulo.core.client.AccumuloSecurityException;
-import org.apache.accumulo.core.client.BatchScanner;
-import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.TableNotFoundException;
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.Range;
-import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.security.Authorizations;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.Text;
-
-public class EntityLocalityGroupSetter {
-
-
- String tablePrefix;
- Connector conn;
- Configuration conf;
-
- public EntityLocalityGroupSetter(String tablePrefix, Connector conn, Configuration conf) {
- this.conn = conn;
- this.tablePrefix = tablePrefix;
- this.conf = conf;
- }
-
-
-
- private Iterator<String> getPredicates() {
-
- String auths = conf.get(ConfigUtils.CLOUDBASE_AUTHS);
- BatchScanner bs = null;
- try {
- bs = conn.createBatchScanner(tablePrefix + "prospects", new Authorizations(auths), 10);
- } catch (TableNotFoundException e) {
- e.printStackTrace();
- }
- bs.setRanges(Collections.singleton(Range.prefix(new Text("predicate" + "\u0000"))));
- final Iterator<Entry<Key,Value>> iter = bs.iterator();
-
- return new Iterator<String>() {
-
- private String next = null;
- private boolean hasNextCalled = false;
- private boolean isEmpty = false;
-
- @Override
- public boolean hasNext() {
-
- if (!hasNextCalled && !isEmpty) {
- while (iter.hasNext()) {
- Entry<Key,Value> temp = iter.next();
- String row = temp.getKey().getRow().toString();
- String[] rowArray = row.split("\u0000");
- next = rowArray[1];
-
- hasNextCalled = true;
- return true;
- }
- isEmpty = true;
- return false;
- } else if(isEmpty) {
- return false;
- }else {
- return true;
- }
- }
-
- @Override
- public String next() {
-
- if (hasNextCalled) {
- hasNextCalled = false;
- return next;
- } else if(isEmpty) {
- throw new NoSuchElementException();
- }else {
- if (this.hasNext()) {
- hasNextCalled = false;
- return next;
- } else {
- throw new NoSuchElementException();
- }
- }
- }
-
- @Override
- public void remove() {
-
- throw new UnsupportedOperationException("Cannot delete from iterator!");
-
- }
-
- };
- }
-
-
-
-
-
-
-
-
- public void setLocalityGroups() {
-
- HashMap<String, Set<Text>> localityGroups = new HashMap<String, Set<Text>>();
- Iterator<String> groups = getPredicates();
-
- int i = 1;
-
- while(groups.hasNext()) {
- HashSet<Text> tempColumn = new HashSet<Text>();
- String temp = groups.next();
- tempColumn.add(new Text(temp));
- String groupName = "predicate" + i;
- localityGroups.put(groupName, tempColumn);
- i++;
- }
-
-
- try {
- conn.tableOperations().setLocalityGroups(tablePrefix + "doc_partitioned_index", localityGroups);
- //conn.tableOperations().compact(tablePrefix + "doc_partitioned_index", null, null, true, true);
- } catch (AccumuloException e) {
- e.printStackTrace();
- } catch (AccumuloSecurityException e) {
- e.printStackTrace();
- } catch (TableNotFoundException e) {
- e.printStackTrace();
- }
-
-
-
- }
-
-
-
-
-
-
-
-}