You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rya.apache.org by dl...@apache.org on 2017/08/30 20:31:47 UTC
[09/14] incubator-rya git commit: RYA-324,
RYA-272 Geo refactoring and examples closes #182
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/9e76b8d7/extras/rya.geoindexing/geo.mongo/src/main/java/org/apache/rya/indexing/geotemporal/mongo/MongoEventStorage.java
----------------------------------------------------------------------
diff --git a/extras/rya.geoindexing/geo.mongo/src/main/java/org/apache/rya/indexing/geotemporal/mongo/MongoEventStorage.java b/extras/rya.geoindexing/geo.mongo/src/main/java/org/apache/rya/indexing/geotemporal/mongo/MongoEventStorage.java
new file mode 100644
index 0000000..9c13c8b
--- /dev/null
+++ b/extras/rya.geoindexing/geo.mongo/src/main/java/org/apache/rya/indexing/geotemporal/mongo/MongoEventStorage.java
@@ -0,0 +1,195 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.geotemporal.mongo;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+
+import org.apache.rya.api.domain.RyaURI;
+import org.apache.rya.indexing.IndexingExpr;
+import org.apache.rya.indexing.entity.model.TypedEntity;
+import org.apache.rya.indexing.entity.storage.mongo.DocumentConverter.DocumentConverterException;
+import org.apache.rya.indexing.entity.storage.mongo.MongoEntityStorage;
+import org.apache.rya.indexing.geotemporal.GeoTemporalIndexException;
+import org.apache.rya.indexing.geotemporal.model.Event;
+import org.apache.rya.indexing.geotemporal.storage.EventStorage;
+import org.bson.BsonDocument;
+import org.bson.BsonString;
+import org.bson.Document;
+import org.bson.conversions.Bson;
+
+import com.mongodb.BasicDBObjectBuilder;
+import com.mongodb.DBObject;
+import com.mongodb.ErrorCategory;
+import com.mongodb.MongoClient;
+import com.mongodb.MongoException;
+import com.mongodb.client.MongoCollection;
+import com.mongodb.client.MongoCursor;
+import com.mongodb.client.model.Filters;
+
+public class MongoEventStorage implements EventStorage {
+
+ protected static final String COLLECTION_NAME = "geotemporal-events";
+
+ private static final EventDocumentConverter EVENT_CONVERTER = new EventDocumentConverter();
+
+ /**
+ * A client connected to the Mongo instance that hosts the Rya instance.
+ */
+ protected final MongoClient mongo;
+
+ /**
+ * The name of the Rya instance the {@link TypedEntity}s are for.
+ */
+ protected final String ryaInstanceName;
+
+ /*
+ * Used to get the filter query objects.
+ */
+ private final GeoTemporalMongoDBStorageStrategy queryAdapter;
+
+ /**
+ * Constructs an instance of {@link MongoEntityStorage}.
+ *
+ * @param mongo - A client connected to the Mongo instance that hosts the Rya instance. (not null)
+ * @param ryaInstanceName - The name of the Rya instance the {@link TypedEntity}s are for. (not null)
+ */
+ public MongoEventStorage(final MongoClient mongo, final String ryaInstanceName) {
+ this.mongo = requireNonNull(mongo);
+ this.ryaInstanceName = requireNonNull(ryaInstanceName);
+ queryAdapter = new GeoTemporalMongoDBStorageStrategy();
+ }
+
+ @Override
+ public void create(final Event event) throws EventStorageException {
+ requireNonNull(event);
+
+ try {
+ mongo.getDatabase(ryaInstanceName)
+ .getCollection(COLLECTION_NAME)
+ .insertOne(EVENT_CONVERTER.toDocument(event));
+ } catch(final MongoException e) {
+ final ErrorCategory category = ErrorCategory.fromErrorCode( e.getCode() );
+ if(category == ErrorCategory.DUPLICATE_KEY) {
+ throw new EventAlreadyExistsException("Failed to create Event with Subject '" + event.getSubject().getData() + "'.", e);
+ }
+ throw new EventStorageException("Failed to create Event with Subject '" + event.getSubject().getData() + "'.", e);
+ }
+ }
+
+ @Override
+ public Optional<Event> get(final RyaURI subject) throws EventStorageException {
+ requireNonNull(subject);
+
+ try {
+ final Document document = mongo.getDatabase(ryaInstanceName)
+ .getCollection(COLLECTION_NAME)
+ .find( new BsonDocument(EventDocumentConverter.SUBJECT, new BsonString(subject.getData())) )
+ .first();
+
+ return document == null ?
+ Optional.empty() :
+ Optional.of( EVENT_CONVERTER.fromDocument(document) );
+
+ } catch(final MongoException | DocumentConverterException e) {
+ throw new EventStorageException("Could not get the Event with Subject '" + subject.getData() + "'.", e);
+ }
+ }
+
+ @Override
+ public Collection<Event> search(final Optional<RyaURI> subject, final Optional<Collection<IndexingExpr>> geoFilters, final Optional<Collection<IndexingExpr>> temporalFilters) throws EventStorageException {
+ requireNonNull(subject);
+
+ try {
+ final Collection<IndexingExpr> geos = (geoFilters.isPresent() ? geoFilters.get() : new ArrayList<>());
+ final Collection<IndexingExpr> tempos = (temporalFilters.isPresent() ? temporalFilters.get() : new ArrayList<>());
+ final DBObject filterObj = queryAdapter.getFilterQuery(geos, tempos);
+
+ final BasicDBObjectBuilder builder = BasicDBObjectBuilder
+ .start(filterObj.toMap());
+ if(subject.isPresent()) {
+ builder.append(EventDocumentConverter.SUBJECT, subject.get().getData());
+ }
+ final MongoCursor<Document> results = mongo.getDatabase(ryaInstanceName)
+ .getCollection(COLLECTION_NAME)
+ .find( BsonDocument.parse(builder.get().toString()) )
+ .iterator();
+
+ final List<Event> events = new ArrayList<>();
+ while(results.hasNext()) {
+ events.add(EVENT_CONVERTER.fromDocument(results.next()));
+ }
+ return events;
+ } catch(final MongoException | DocumentConverterException | GeoTemporalIndexException e) {
+ throw new EventStorageException("Could not get the Event.", e);
+ }
+ }
+
+ @Override
+ public void update(final Event old, final Event updated) throws StaleUpdateException, EventStorageException {
+ requireNonNull(old);
+ requireNonNull(updated);
+
+ // The updated entity must have the same Subject as the one it is replacing.
+ if(!old.getSubject().equals(updated.getSubject())) {
+ throw new EventStorageException("The old Event and the updated Event must have the same Subject. " +
+ "Old Subject: " + old.getSubject().getData() + ", Updated Subject: " + updated.getSubject().getData());
+ }
+
+ final Set<Bson> filters = new HashSet<>();
+
+ // Must match the old entity's Subject.
+ filters.add( makeSubjectFilter(old.getSubject()) );
+
+ // Do a find and replace.
+ final Bson oldEntityFilter = Filters.and(filters);
+ final Document updatedDoc = EVENT_CONVERTER.toDocument(updated);
+
+ final MongoCollection<Document> collection = mongo.getDatabase(ryaInstanceName).getCollection(COLLECTION_NAME);
+ if(collection.findOneAndReplace(oldEntityFilter, updatedDoc) == null) {
+ throw new StaleUpdateException("Could not update the Event with Subject '" + updated.getSubject().getData() + ".");
+ }
+ }
+
+ @Override
+ public boolean delete(final RyaURI subject) throws EventStorageException {
+ requireNonNull(subject);
+
+ try {
+ final Document deleted = mongo.getDatabase(ryaInstanceName)
+ .getCollection(COLLECTION_NAME)
+ .findOneAndDelete( makeSubjectFilter(subject) );
+
+ return deleted != null;
+
+ } catch(final MongoException e) {
+ throw new EventStorageException("Could not delete the Event with Subject '" + subject.getData() + "'.", e);
+ }
+ }
+
+ private static Bson makeSubjectFilter(final RyaURI subject) {
+ return Filters.eq(EventDocumentConverter.SUBJECT, subject.getData());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/9e76b8d7/extras/rya.geoindexing/geo.mongo/src/main/java/org/apache/rya/indexing/geotemporal/mongo/MongoGeoTemporalIndexer.java
----------------------------------------------------------------------
diff --git a/extras/rya.geoindexing/geo.mongo/src/main/java/org/apache/rya/indexing/geotemporal/mongo/MongoGeoTemporalIndexer.java b/extras/rya.geoindexing/geo.mongo/src/main/java/org/apache/rya/indexing/geotemporal/mongo/MongoGeoTemporalIndexer.java
new file mode 100644
index 0000000..2561c23
--- /dev/null
+++ b/extras/rya.geoindexing/geo.mongo/src/main/java/org/apache/rya/indexing/geotemporal/mongo/MongoGeoTemporalIndexer.java
@@ -0,0 +1,227 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.geotemporal.mongo;
+
+import static com.google.common.base.Preconditions.checkState;
+import static java.util.Objects.requireNonNull;
+
+import java.io.IOException;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.regex.Matcher;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.log4j.Logger;
+import org.apache.rya.api.domain.RyaStatement;
+import org.apache.rya.api.domain.RyaURI;
+import org.apache.rya.api.resolver.RyaToRdfConversions;
+import org.apache.rya.indexing.GeoConstants;
+import org.apache.rya.indexing.TemporalInstant;
+import org.apache.rya.indexing.TemporalInstantRfc3339;
+import org.apache.rya.indexing.TemporalInterval;
+import org.apache.rya.indexing.accumulo.ConfigUtils;
+import org.apache.rya.indexing.accumulo.geo.GeoParseUtils;
+import org.apache.rya.indexing.geotemporal.GeoTemporalIndexer;
+import org.apache.rya.indexing.geotemporal.model.Event;
+import org.apache.rya.indexing.geotemporal.storage.EventStorage;
+import org.apache.rya.indexing.mongodb.AbstractMongoIndexer;
+import org.apache.rya.indexing.mongodb.IndexingException;
+import org.apache.rya.indexing.mongodb.geo.GmlParser;
+import org.apache.rya.mongodb.MongoConnectorFactory;
+import org.apache.rya.mongodb.MongoDBRdfConfiguration;
+import org.joda.time.DateTime;
+import org.openrdf.model.Statement;
+import org.openrdf.model.URI;
+
+import com.vividsolutions.jts.geom.Geometry;
+import com.vividsolutions.jts.io.ParseException;
+
+/**
+ * Indexer that stores 2 separate statements as one 'Event' entity.
+ * <p>
+ * The statements are required to have the same subject, one must be
+ * a Geo based statement and the other a temporal based statement.
+ * <p>
+ * This indexer is later used when querying for geo/temporal statements
+ * in the format of:
+ * <pre>
+ * {@code
+ * QUERY PARAMS
+ * ?SomeSubject geo:predicate ?Location
+ * ?SomeSubject time:predicate ?Time
+ * Filter(?Location, geoFunction())
+ * Filter(?Time, temporalFunction())
+ * }
+ *
+ * The number of filters is not strict, but there must be at least one
+ * query pattern for geo and one for temporal as well as at least one
+ * filter for each type.
+ */
+public class MongoGeoTemporalIndexer extends AbstractMongoIndexer<GeoTemporalMongoDBStorageStrategy> implements GeoTemporalIndexer {
+ private static final Logger LOG = Logger.getLogger(MongoGeoTemporalIndexer.class);
+ public static final String GEO_TEMPORAL_COLLECTION = "geo_temporal";
+
+ private final AtomicReference<MongoDBRdfConfiguration> configuration = new AtomicReference<>();
+ private final AtomicReference<EventStorage> events = new AtomicReference<>();
+
+ @Override
+ public void init() {
+ initCore();
+ predicates = ConfigUtils.getGeoPredicates(conf);
+ predicates.addAll(ConfigUtils.getTemporalPredicates(conf));
+ storageStrategy = new GeoTemporalMongoDBStorageStrategy();
+ }
+
+ @Override
+ public void setConf(final Configuration conf) {
+ requireNonNull(conf);
+ events.set(null);
+ events.set(getEventStorage(conf));
+ super.conf = conf;
+ configuration.set(new MongoDBRdfConfiguration(conf));
+ }
+
+ @Override
+ public void storeStatement(final RyaStatement ryaStatement) throws IOException {
+ requireNonNull(ryaStatement);
+
+ try {
+ updateEvent(ryaStatement.getSubject(), ryaStatement);
+ } catch (IndexingException | ParseException e) {
+ throw new IOException("Failed to update the Entity index.", e);
+ }
+ }
+
+ @Override
+ public void deleteStatement(final RyaStatement statement) throws IOException {
+ requireNonNull(statement);
+ final RyaURI subject = statement.getSubject();
+ try {
+ final EventStorage eventStore = events.get();
+ checkState(events != null, "Must set this indexers configuration before storing statements.");
+
+ new EventUpdater(eventStore).update(subject, old -> {
+ final Event.Builder updated;
+ if(!old.isPresent()) {
+ return Optional.empty();
+ } else {
+ updated = Event.builder(old.get());
+ }
+
+ final Event currentEvent = updated.build();
+ final URI pred = statement.getObject().getDataType();
+ if((pred.equals(GeoConstants.GEO_AS_WKT) || pred.equals(GeoConstants.GEO_AS_GML) ||
+ pred.equals(GeoConstants.XMLSCHEMA_OGC_WKT) || pred.equals(GeoConstants.XMLSCHEMA_OGC_GML))
+ && currentEvent.getGeometry().isPresent()) {
+ //is geo and needs to be removed.
+ try {
+ if(currentEvent.getGeometry().get().equals(GeoParseUtils.getGeometry(RyaToRdfConversions.convertStatement(statement), new GmlParser()))) {
+ updated.setGeometry(null);
+ }
+ } catch (final Exception e) {
+ LOG.debug("Unable to parse the stored geometry.");
+ }
+ } else {
+ //is time
+ final String dateTime = statement.getObject().getData();
+ final Matcher matcher = TemporalInstantRfc3339.PATTERN.matcher(dateTime);
+ if (matcher.find()) {
+ final TemporalInterval interval = TemporalInstantRfc3339.parseInterval(dateTime);
+ if(currentEvent.getInterval().get().equals(interval)) {
+ updated.setTemporalInterval(null);
+ }
+ } else {
+ final TemporalInstant instant = new TemporalInstantRfc3339(DateTime.parse(dateTime));
+ if(currentEvent.getInstant().get().equals(instant)) {
+ updated.setTemporalInstant(null);
+ }
+ }
+ }
+ return Optional.of(updated.build());
+ });
+ } catch (final IndexingException e) {
+ throw new IOException("Failed to update the Entity index.", e);
+ }
+ }
+
+ private void updateEvent(final RyaURI subject, final RyaStatement statement) throws IndexingException, ParseException {
+ final EventStorage eventStore = events.get();
+ checkState(events != null, "Must set this indexers configuration before storing statements.");
+
+ new EventUpdater(eventStore).update(subject, old -> {
+ final Event.Builder updated;
+ if(!old.isPresent()) {
+ updated = Event.builder()
+ .setSubject(subject);
+ } else {
+ updated = Event.builder(old.get());
+ }
+
+ final URI pred = statement.getObject().getDataType();
+ if(pred.equals(GeoConstants.GEO_AS_WKT) || pred.equals(GeoConstants.GEO_AS_GML) ||
+ pred.equals(GeoConstants.XMLSCHEMA_OGC_WKT) || pred.equals(GeoConstants.XMLSCHEMA_OGC_GML)) {
+ //is geo
+ try {
+ final Statement geoStatement = RyaToRdfConversions.convertStatement(statement);
+ final Geometry geometry = GeoParseUtils.getGeometry(geoStatement, new GmlParser());
+ updated.setGeometry(geometry);
+ } catch (final ParseException e) {
+ LOG.error(e.getMessage(), e);
+ }
+ } else {
+ //is time
+ final String dateTime = statement.getObject().getData();
+ final Matcher matcher = TemporalInstantRfc3339.PATTERN.matcher(dateTime);
+ if (matcher.find()) {
+ final TemporalInterval interval = TemporalInstantRfc3339.parseInterval(dateTime);
+ updated.setTemporalInterval(interval);
+ } else {
+ final TemporalInstant instant = new TemporalInstantRfc3339(DateTime.parse(dateTime));
+ updated.setTemporalInstant(instant);
+ }
+ }
+ return Optional.of(updated.build());
+ });
+ }
+
+ @Override
+ public String getCollectionName() {
+ return ConfigUtils.getTablePrefix(conf) + GEO_TEMPORAL_COLLECTION;
+ }
+
+ @Override
+ public EventStorage getEventStorage(final Configuration conf) {
+ requireNonNull(conf);
+
+ if(events.get() != null) {
+ return events.get();
+ }
+
+
+ final MongoDBRdfConfiguration mongoConf = new MongoDBRdfConfiguration(conf);
+ mongoClient = mongoConf.getMongoClient();
+ configuration.set(mongoConf);
+ if (mongoClient == null) {
+ mongoClient = MongoConnectorFactory.getMongoClient(conf);
+ }
+ final String ryaInstanceName = mongoConf.getMongoDBName();
+ events.set(new MongoEventStorage(mongoClient, ryaInstanceName));
+ return events.get();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/9e76b8d7/extras/rya.geoindexing/geo.mongo/src/main/java/org/apache/rya/indexing/mongodb/geo/GeoMongoDBStorageStrategy.java
----------------------------------------------------------------------
diff --git a/extras/rya.geoindexing/geo.mongo/src/main/java/org/apache/rya/indexing/mongodb/geo/GeoMongoDBStorageStrategy.java b/extras/rya.geoindexing/geo.mongo/src/main/java/org/apache/rya/indexing/mongodb/geo/GeoMongoDBStorageStrategy.java
new file mode 100644
index 0000000..634359f
--- /dev/null
+++ b/extras/rya.geoindexing/geo.mongo/src/main/java/org/apache/rya/indexing/mongodb/geo/GeoMongoDBStorageStrategy.java
@@ -0,0 +1,247 @@
+package org.apache.rya.indexing.mongodb.geo;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.log4j.Logger;
+import org.apache.rya.api.domain.RyaStatement;
+import org.apache.rya.api.resolver.RyaToRdfConversions;
+import org.apache.rya.indexing.accumulo.geo.GeoParseUtils;
+import org.apache.rya.indexing.mongodb.IndexingMongoDBStorageStrategy;
+import org.bson.Document;
+import org.openrdf.model.Statement;
+import org.openrdf.query.MalformedQueryException;
+
+import com.mongodb.BasicDBObject;
+import com.mongodb.BasicDBObjectBuilder;
+import com.mongodb.DBCollection;
+import com.mongodb.DBObject;
+import com.vividsolutions.jts.geom.Coordinate;
+import com.vividsolutions.jts.geom.Geometry;
+import com.vividsolutions.jts.geom.Point;
+import com.vividsolutions.jts.geom.Polygon;
+import com.vividsolutions.jts.io.ParseException;
+import com.vividsolutions.jts.io.WKTReader;
+
+public class GeoMongoDBStorageStrategy extends IndexingMongoDBStorageStrategy {
+ private static final Logger LOG = Logger.getLogger(GeoMongoDBStorageStrategy.class);
+
+ private static final String GEO = "location";
+ public enum GeoQueryType {
+ INTERSECTS {
+ @Override
+ public String getKeyword() {
+ return "$geoIntersects";
+ }
+ }, WITHIN {
+ @Override
+ public String getKeyword() {
+ return "$geoWithin";
+ }
+ }, EQUALS {
+ @Override
+ public String getKeyword() {
+ return "$near";
+ }
+ }, NEAR {
+ @Override
+ public String getKeyword() {
+ return "$near";
+ }
+ };
+
+ public abstract String getKeyword();
+ }
+
+ public static class GeoQuery {
+ private final GeoQueryType queryType;
+ private final Geometry geo;
+
+ private final Double maxDistance;
+ private final Double minDistance;
+
+ public GeoQuery(final GeoQueryType queryType, final Geometry geo) {
+ this(queryType, geo, 0, 0);
+ }
+
+ public GeoQuery(final GeoQueryType queryType, final Geometry geo, final double maxDistance,
+ final double minDistance) {
+ this.queryType = queryType;
+ this.geo = geo;
+ this.maxDistance = maxDistance;
+ this.minDistance = minDistance;
+ }
+
+ public GeoQueryType getQueryType() {
+ return queryType;
+ }
+
+ public Geometry getGeo() {
+ return geo;
+ }
+
+ public Double getMaxDistance() {
+ return maxDistance;
+ }
+
+ public Double getMinDistance() {
+ return minDistance;
+ }
+ }
+
+ private final Double maxDistance;
+
+ public GeoMongoDBStorageStrategy(final Double maxDistance) {
+ this.maxDistance = maxDistance;
+ }
+
+ @Override
+ public void createIndices(final DBCollection coll){
+ coll.createIndex(new BasicDBObject(GEO, "2dsphere"));
+ }
+
+ public DBObject getQuery(final GeoQuery queryObj) throws MalformedQueryException {
+ final Geometry geo = queryObj.getGeo();
+ final GeoQueryType queryType = queryObj.getQueryType();
+ if (queryType == GeoQueryType.WITHIN && !(geo instanceof Polygon)) {
+ //They can also be applied to MultiPolygons, but those are not supported either.
+ throw new MalformedQueryException("Mongo Within operations can only be performed on Polygons.");
+ } else if(queryType == GeoQueryType.NEAR && !(geo instanceof Point)) {
+ //They can also be applied to Point, but those are not supported either.
+ throw new MalformedQueryException("Mongo near operations can only be performed on Points.");
+ }
+
+ BasicDBObject query;
+ if (queryType.equals(GeoQueryType.EQUALS)){
+ if(geo.getNumPoints() == 1) {
+ final List circle = new ArrayList();
+ circle.add(getPoint(geo));
+ circle.add(maxDistance);
+ final BasicDBObject polygon = new BasicDBObject("$centerSphere", circle);
+ query = new BasicDBObject(GEO, new BasicDBObject(GeoQueryType.WITHIN.getKeyword(), polygon));
+ } else {
+ query = new BasicDBObject(GEO, getCorrespondingPoints(geo));
+ }
+ } else if(queryType.equals(GeoQueryType.NEAR)) {
+ final BasicDBObject geoDoc = new BasicDBObject("$geometry", getDBPoint(geo));
+ if(queryObj.getMaxDistance() != 0) {
+ geoDoc.append("$maxDistance", queryObj.getMaxDistance());
+ }
+
+ if(queryObj.getMinDistance() != 0) {
+ geoDoc.append("$minDistance", queryObj.getMinDistance());
+ }
+ query = new BasicDBObject(GEO, new BasicDBObject(queryType.getKeyword(), geoDoc));
+ } else {
+ final BasicDBObject geoDoc = new BasicDBObject("$geometry", getCorrespondingPoints(geo));
+ query = new BasicDBObject(GEO, new BasicDBObject(queryType.getKeyword(), geoDoc));
+ }
+
+ return query;
+ }
+
+ @Override
+ public DBObject serialize(final RyaStatement ryaStatement) {
+ // if the object is wkt, then try to index it
+ // write the statement data to the fields
+ try {
+ final Statement statement = RyaToRdfConversions.convertStatement(ryaStatement);
+ final Geometry geo = (new WKTReader()).read(GeoParseUtils.getWellKnownText(statement));
+ if(geo == null) {
+ LOG.error("Failed to parse geo statement: " + statement.toString());
+ return null;
+ }
+ final BasicDBObject base = (BasicDBObject) super.serialize(ryaStatement);
+ if (geo.getNumPoints() > 1) {
+ base.append(GEO, getCorrespondingPoints(geo));
+ } else {
+ base.append(GEO, getDBPoint(geo));
+ }
+ return base;
+ } catch(final ParseException e) {
+ LOG.error("Could not create geometry for statement " + ryaStatement, e);
+ return null;
+ }
+ }
+
+ public Document getCorrespondingPoints(final Geometry geo) {
+ //Polygons must be a 3 dimensional array.
+
+ //polygons must be a closed loop
+ final Document geoDoc = new Document();
+ if (geo instanceof Polygon) {
+ final Polygon poly = (Polygon) geo;
+ final List<List<List<Double>>> DBpoints = new ArrayList<>();
+
+ // outer shell of the polygon
+ final List<List<Double>> ring = new ArrayList<>();
+ for (final Coordinate coord : poly.getExteriorRing().getCoordinates()) {
+ ring.add(getPoint(coord));
+ }
+ DBpoints.add(ring);
+
+ // each hold in the polygon
+ for (int ii = 0; ii < poly.getNumInteriorRing(); ii++) {
+ final List<List<Double>> holeCoords = new ArrayList<>();
+ for (final Coordinate coord : poly.getInteriorRingN(ii).getCoordinates()) {
+ holeCoords.add(getPoint(coord));
+ }
+ DBpoints.add(holeCoords);
+ }
+ geoDoc.append("coordinates", DBpoints)
+ .append("type", "Polygon");
+ } else {
+ final List<List<Double>> points = getPoints(geo);
+ geoDoc.append("coordinates", points)
+ .append("type", "LineString");
+ }
+ return geoDoc;
+ }
+
+ private List<List<Double>> getPoints(final Geometry geo) {
+ final List<List<Double>> points = new ArrayList<>();
+ for (final Coordinate coord : geo.getCoordinates()) {
+ points.add(getPoint(coord));
+ }
+ return points;
+ }
+
+ public Document getDBPoint(final Geometry geo) {
+ return new Document()
+ .append("coordinates", getPoint(geo))
+ .append("type", "Point");
+ }
+
+ private List<Double> getPoint(final Coordinate coord) {
+ final List<Double> point = new ArrayList<>();
+ point.add(coord.x);
+ point.add(coord.y);
+ return point;
+ }
+
+ private List<Double> getPoint(final Geometry geo) {
+ final List<Double> point = new ArrayList<>();
+ point.add(geo.getCoordinate().x);
+ point.add(geo.getCoordinate().y);
+ return point;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/9e76b8d7/extras/rya.geoindexing/geo.mongo/src/main/java/org/apache/rya/indexing/mongodb/geo/GmlParser.java
----------------------------------------------------------------------
diff --git a/extras/rya.geoindexing/geo.mongo/src/main/java/org/apache/rya/indexing/mongodb/geo/GmlParser.java b/extras/rya.geoindexing/geo.mongo/src/main/java/org/apache/rya/indexing/mongodb/geo/GmlParser.java
new file mode 100644
index 0000000..be5f1bc
--- /dev/null
+++ b/extras/rya.geoindexing/geo.mongo/src/main/java/org/apache/rya/indexing/mongodb/geo/GmlParser.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.mongodb.geo;
+
+import java.io.IOException;
+import java.io.Reader;
+
+import javax.xml.parsers.ParserConfigurationException;
+
+import org.apache.rya.indexing.accumulo.geo.GeoParseUtils.GmlToGeometryParser;
+import org.geotools.gml3.GMLConfiguration;
+import org.xml.sax.SAXException;
+
+import com.vividsolutions.jts.geom.Geometry;
+
+
+/**
+ * This wraps geotools parser for rya.geoCommon that cannot be dependent on geotools.
+ *
+ */
+public class GmlParser implements GmlToGeometryParser {
+
+ /* (non-Javadoc)
+ * @see org.apache.rya.indexing.accumulo.geo.GeoParseUtils.GmlToGeometryParser#parse(java.io.Reader)
+ */
+ @Override
+ public Geometry parse(Reader reader) throws IOException, SAXException, ParserConfigurationException {
+ final org.geotools.xml.Parser gmlParser = new org.geotools.xml.Parser(new GMLConfiguration());
+ return (Geometry) gmlParser.parse(reader);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/9e76b8d7/extras/rya.geoindexing/geo.mongo/src/main/java/org/apache/rya/indexing/mongodb/geo/MongoGeoIndexer.java
----------------------------------------------------------------------
diff --git a/extras/rya.geoindexing/geo.mongo/src/main/java/org/apache/rya/indexing/mongodb/geo/MongoGeoIndexer.java b/extras/rya.geoindexing/geo.mongo/src/main/java/org/apache/rya/indexing/mongodb/geo/MongoGeoIndexer.java
new file mode 100644
index 0000000..2abee76
--- /dev/null
+++ b/extras/rya.geoindexing/geo.mongo/src/main/java/org/apache/rya/indexing/mongodb/geo/MongoGeoIndexer.java
@@ -0,0 +1,154 @@
+package org.apache.rya.indexing.mongodb.geo;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import static org.apache.rya.indexing.mongodb.geo.GeoMongoDBStorageStrategy.GeoQueryType.EQUALS;
+import static org.apache.rya.indexing.mongodb.geo.GeoMongoDBStorageStrategy.GeoQueryType.INTERSECTS;
+import static org.apache.rya.indexing.mongodb.geo.GeoMongoDBStorageStrategy.GeoQueryType.NEAR;
+import static org.apache.rya.indexing.mongodb.geo.GeoMongoDBStorageStrategy.GeoQueryType.WITHIN;
+
+import org.apache.log4j.Logger;
+import org.apache.rya.indexing.GeoIndexer;
+import org.apache.rya.indexing.StatementConstraints;
+import org.apache.rya.indexing.accumulo.ConfigUtils;
+import org.apache.rya.indexing.accumulo.geo.GeoTupleSet.GeoSearchFunctionFactory.NearQuery;
+import org.apache.rya.indexing.mongodb.AbstractMongoIndexer;
+import org.apache.rya.indexing.mongodb.geo.GeoMongoDBStorageStrategy.GeoQuery;
+import org.apache.rya.mongodb.MongoDBRdfConfiguration;
+import org.openrdf.model.Statement;
+import org.openrdf.query.MalformedQueryException;
+import org.openrdf.query.QueryEvaluationException;
+
+import com.mongodb.DBObject;
+import com.vividsolutions.jts.geom.Geometry;
+
+import info.aduna.iteration.CloseableIteration;
+
+public class MongoGeoIndexer extends AbstractMongoIndexer<GeoMongoDBStorageStrategy> implements GeoIndexer {
+ private static final String COLLECTION_SUFFIX = "geo";
+ private static final Logger logger = Logger.getLogger(MongoGeoIndexer.class);
+
+ @Override
+ public void init() {
+ initCore();
+ predicates = ConfigUtils.getGeoPredicates(conf);
+ if(predicates.size() == 0) {
+ logger.debug("No predicates specified for geo indexing. During insertion, all statements will be attempted to be indexed into the geo indexer.");
+ }
+ storageStrategy = new GeoMongoDBStorageStrategy(Double.valueOf(conf.get(MongoDBRdfConfiguration.MONGO_GEO_MAXDISTANCE, "1e-10")));
+ storageStrategy.createIndices(collection);
+ }
+
+ @Override
+ public CloseableIteration<Statement, QueryEvaluationException> queryEquals(
+ final Geometry query, final StatementConstraints constraints) {
+ try {
+ final DBObject queryObj = storageStrategy.getQuery(new GeoQuery(EQUALS, query));
+ return withConstraints(constraints, queryObj);
+ } catch (final MalformedQueryException e) {
+ logger.error(e.getMessage(), e);
+ return null;
+ }
+ }
+
+ @Override
+ public CloseableIteration<Statement, QueryEvaluationException> queryDisjoint(
+ final Geometry query, final StatementConstraints constraints) {
+ throw new UnsupportedOperationException(
+ "Disjoint queries are not supported in Mongo DB.");
+ }
+
+ @Override
+ public CloseableIteration<Statement, QueryEvaluationException> queryIntersects(
+ final Geometry query, final StatementConstraints constraints) {
+ try {
+ final DBObject queryObj = storageStrategy.getQuery(new GeoQuery(INTERSECTS, query));
+ return withConstraints(constraints, queryObj);
+ } catch (final MalformedQueryException e) {
+ logger.error(e.getMessage(), e);
+ return null;
+ }
+ }
+
+ @Override
+ public CloseableIteration<Statement, QueryEvaluationException> queryTouches(
+ final Geometry query, final StatementConstraints constraints) {
+ throw new UnsupportedOperationException(
+ "Touches queries are not supported in Mongo DB.");
+ }
+
+ @Override
+ public CloseableIteration<Statement, QueryEvaluationException> queryCrosses(
+ final Geometry query, final StatementConstraints constraints) {
+ throw new UnsupportedOperationException(
+ "Crosses queries are not supported in Mongo DB.");
+ }
+
+ @Override
+ public CloseableIteration<Statement, QueryEvaluationException> queryWithin(
+ final Geometry query, final StatementConstraints constraints) {
+ try {
+ final DBObject queryObj = storageStrategy.getQuery(new GeoQuery(WITHIN, query));
+ return withConstraints(constraints, queryObj);
+ } catch (final MalformedQueryException e) {
+ logger.error(e.getMessage(), e);
+ return null;
+ }
+ }
+
+ @Override
+ public CloseableIteration<Statement, QueryEvaluationException> queryNear(final NearQuery query, final StatementConstraints constraints) {
+ double maxDistance = 0;
+ double minDistance = 0;
+ if (query.getMaxDistance().isPresent()) {
+ maxDistance = query.getMaxDistance().get();
+ }
+
+ if (query.getMinDistance().isPresent()) {
+ minDistance = query.getMinDistance().get();
+ }
+
+ try {
+ final DBObject queryObj = storageStrategy.getQuery(new GeoQuery(NEAR, query.getGeometry(), maxDistance, minDistance));
+ return withConstraints(constraints, queryObj);
+ } catch (final MalformedQueryException e) {
+ logger.error(e.getMessage(), e);
+ return null;
+ }
+ }
+
+ @Override
+ public CloseableIteration<Statement, QueryEvaluationException> queryContains(
+ final Geometry query, final StatementConstraints constraints) {
+ throw new UnsupportedOperationException(
+ "Contains queries are not supported in Mongo DB.");
+ }
+
+ @Override
+ public CloseableIteration<Statement, QueryEvaluationException> queryOverlaps(
+ final Geometry query, final StatementConstraints constraints) {
+ throw new UnsupportedOperationException(
+ "Overlaps queries are not supported in Mongo DB.");
+ }
+
+ @Override
+ public String getCollectionName() {
+ return ConfigUtils.getTablePrefix(conf) + COLLECTION_SUFFIX;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/9e76b8d7/extras/rya.geoindexing/geo.mongo/src/main/java/org/apache/rya/indexing/mongodb/geo/MongoGeoTupleSet.java
----------------------------------------------------------------------
diff --git a/extras/rya.geoindexing/geo.mongo/src/main/java/org/apache/rya/indexing/mongodb/geo/MongoGeoTupleSet.java b/extras/rya.geoindexing/geo.mongo/src/main/java/org/apache/rya/indexing/mongodb/geo/MongoGeoTupleSet.java
new file mode 100644
index 0000000..c564d02
--- /dev/null
+++ b/extras/rya.geoindexing/geo.mongo/src/main/java/org/apache/rya/indexing/mongodb/geo/MongoGeoTupleSet.java
@@ -0,0 +1,361 @@
+package org.apache.rya.indexing.mongodb.geo;
+
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.conf.Configuration;
+import org.openrdf.model.Statement;
+import org.openrdf.model.URI;
+import org.openrdf.model.Value;
+import org.openrdf.query.BindingSet;
+import org.openrdf.query.QueryEvaluationException;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.Maps;
+import com.vividsolutions.jts.geom.Geometry;
+import com.vividsolutions.jts.io.ParseException;
+import com.vividsolutions.jts.io.WKTReader;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+import info.aduna.iteration.CloseableIteration;
+import org.apache.rya.indexing.GeoConstants;
+import org.apache.rya.indexing.GeoIndexer;
+import org.apache.rya.indexing.IndexingExpr;
+import org.apache.rya.indexing.IteratorFactory;
+import org.apache.rya.indexing.SearchFunction;
+import org.apache.rya.indexing.StatementConstraints;
+import org.apache.rya.indexing.accumulo.geo.GeoTupleSet;
+import org.apache.rya.indexing.external.tupleSet.ExternalTupleSet;
+
+public class MongoGeoTupleSet extends ExternalTupleSet {
+
+ private Configuration conf;
+ private GeoIndexer geoIndexer;
+ private IndexingExpr filterInfo;
+
+
+ public MongoGeoTupleSet(IndexingExpr filterInfo, GeoIndexer geoIndexer) {
+ this.filterInfo = filterInfo;
+ this.geoIndexer = geoIndexer;
+ this.conf = geoIndexer.getConf();
+ }
+
+ @Override
+ public Set<String> getBindingNames() {
+ return filterInfo.getBindingNames();
+ }
+
+ public GeoTupleSet clone() {
+ return new GeoTupleSet(filterInfo, geoIndexer);
+ }
+
+ @Override
+ public double cardinality() {
+ return 0.0; // No idea how the estimate cardinality here.
+ }
+
+
+ @Override
+ public String getSignature() {
+ return "(GeoTuple Projection) " + "variables: " + Joiner.on(", ").join(this.getBindingNames()).replaceAll("\\s+", " ");
+ }
+
+
+
+ @Override
+ public boolean equals(Object other) {
+ if (other == this) {
+ return true;
+ }
+ if (!(other instanceof MongoGeoTupleSet)) {
+ return false;
+ }
+ MongoGeoTupleSet arg = (MongoGeoTupleSet) other;
+ return this.filterInfo.equals(arg.filterInfo);
+ }
+
+ @Override
+ public int hashCode() {
+ int result = 17;
+ result = 31*result + filterInfo.hashCode();
+
+ return result;
+ }
+
+
+
+ /**
+ * Returns an iterator over the result set of the contained IndexingExpr.
+ * <p>
+ * Should be thread-safe (concurrent invocation {@link OfflineIterable} this
+ * method can be expected with some query evaluators.
+ */
+ @Override
+ public CloseableIteration<BindingSet, QueryEvaluationException> evaluate(BindingSet bindings)
+ throws QueryEvaluationException {
+
+
+ URI funcURI = filterInfo.getFunction();
+ SearchFunction searchFunction = (new MongoGeoSearchFunctionFactory(conf)).getSearchFunction(funcURI);
+ if(filterInfo.getArguments().length > 1) {
+ throw new IllegalArgumentException("Index functions do not support more than two arguments.");
+ }
+
+ String queryText = ((Value) filterInfo.getArguments()[0]).stringValue();
+
+ return IteratorFactory.getIterator(filterInfo.getSpConstraint(), bindings, queryText, searchFunction);
+ }
+
+
+
+ //returns appropriate search function for a given URI
+ //search functions used in GeoMesaGeoIndexer to access index
+ public class MongoGeoSearchFunctionFactory {
+
+ Configuration conf;
+
+ private final Map<URI, SearchFunction> SEARCH_FUNCTION_MAP = Maps.newHashMap();
+
+ public MongoGeoSearchFunctionFactory(Configuration conf) {
+ this.conf = conf;
+ }
+
+
+ /**
+ * Get a {@link GeoSearchFunction} for a given URI.
+ *
+ * @param searchFunction
+ * @return
+ */
+ public SearchFunction getSearchFunction(final URI searchFunction) {
+
+ SearchFunction geoFunc = null;
+
+ try {
+ geoFunc = getSearchFunctionInternal(searchFunction);
+ } catch (QueryEvaluationException e) {
+ e.printStackTrace();
+ }
+
+ return geoFunc;
+ }
+
+ private SearchFunction getSearchFunctionInternal(final URI searchFunction) throws QueryEvaluationException {
+ SearchFunction sf = SEARCH_FUNCTION_MAP.get(searchFunction);
+
+ if (sf != null) {
+ return sf;
+ } else {
+ throw new QueryEvaluationException("Unknown Search Function: " + searchFunction.stringValue());
+ }
+ }
+
+ private final SearchFunction GEO_EQUALS = new SearchFunction() {
+
+ @Override
+ public CloseableIteration<Statement, QueryEvaluationException> performSearch(String queryText,
+ StatementConstraints contraints) throws QueryEvaluationException {
+ try {
+ WKTReader reader = new WKTReader();
+ Geometry geometry = reader.read(queryText);
+ CloseableIteration<Statement, QueryEvaluationException> statements = geoIndexer.queryWithin(
+ geometry, contraints);
+ return statements;
+ } catch (ParseException e) {
+ throw new QueryEvaluationException(e);
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "GEO_EQUALS";
+ };
+ };
+
+ private final SearchFunction GEO_DISJOINT = new SearchFunction() {
+
+ @Override
+ public CloseableIteration<Statement, QueryEvaluationException> performSearch(String queryText,
+ StatementConstraints contraints) throws QueryEvaluationException {
+ try {
+ WKTReader reader = new WKTReader();
+ Geometry geometry = reader.read(queryText);
+ CloseableIteration<Statement, QueryEvaluationException> statements = geoIndexer.queryWithin(
+ geometry, contraints);
+ return statements;
+ } catch (ParseException e) {
+ throw new QueryEvaluationException(e);
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "GEO_DISJOINT";
+ };
+ };
+
+ private final SearchFunction GEO_INTERSECTS = new SearchFunction() {
+
+ @Override
+ public CloseableIteration<Statement, QueryEvaluationException> performSearch(String queryText,
+ StatementConstraints contraints) throws QueryEvaluationException {
+ try {
+ WKTReader reader = new WKTReader();
+ Geometry geometry = reader.read(queryText);
+ CloseableIteration<Statement, QueryEvaluationException> statements = geoIndexer.queryWithin(
+ geometry, contraints);
+ return statements;
+ } catch (ParseException e) {
+ throw new QueryEvaluationException(e);
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "GEO_INTERSECTS";
+ };
+ };
+
+ private final SearchFunction GEO_TOUCHES = new SearchFunction() {
+
+ @Override
+ public CloseableIteration<Statement, QueryEvaluationException> performSearch(String queryText,
+ StatementConstraints contraints) throws QueryEvaluationException {
+ try {
+ WKTReader reader = new WKTReader();
+ Geometry geometry = reader.read(queryText);
+ CloseableIteration<Statement, QueryEvaluationException> statements = geoIndexer.queryWithin(
+ geometry, contraints);
+ return statements;
+ } catch (ParseException e) {
+ throw new QueryEvaluationException(e);
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "GEO_TOUCHES";
+ };
+ };
+
+ private final SearchFunction GEO_CONTAINS = new SearchFunction() {
+
+ @Override
+ public CloseableIteration<Statement, QueryEvaluationException> performSearch(String queryText,
+ StatementConstraints contraints) throws QueryEvaluationException {
+ try {
+ WKTReader reader = new WKTReader();
+ Geometry geometry = reader.read(queryText);
+ CloseableIteration<Statement, QueryEvaluationException> statements = geoIndexer.queryWithin(
+ geometry, contraints);
+ return statements;
+ } catch (ParseException e) {
+ throw new QueryEvaluationException(e);
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "GEO_CONTAINS";
+ };
+ };
+
+ private final SearchFunction GEO_OVERLAPS = new SearchFunction() {
+
+ @Override
+ public CloseableIteration<Statement, QueryEvaluationException> performSearch(String queryText,
+ StatementConstraints contraints) throws QueryEvaluationException {
+ try {
+ WKTReader reader = new WKTReader();
+ Geometry geometry = reader.read(queryText);
+ CloseableIteration<Statement, QueryEvaluationException> statements = geoIndexer.queryWithin(
+ geometry, contraints);
+ return statements;
+ } catch (ParseException e) {
+ throw new QueryEvaluationException(e);
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "GEO_OVERLAPS";
+ };
+ };
+
+ private final SearchFunction GEO_CROSSES = new SearchFunction() {
+
+ @Override
+ public CloseableIteration<Statement, QueryEvaluationException> performSearch(String queryText,
+ StatementConstraints contraints) throws QueryEvaluationException {
+ try {
+ WKTReader reader = new WKTReader();
+ Geometry geometry = reader.read(queryText);
+ CloseableIteration<Statement, QueryEvaluationException> statements = geoIndexer.queryWithin(
+ geometry, contraints);
+ return statements;
+ } catch (ParseException e) {
+ throw new QueryEvaluationException(e);
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "GEO_CROSSES";
+ };
+ };
+
+ private final SearchFunction GEO_WITHIN = new SearchFunction() {
+
+ @Override
+ public CloseableIteration<Statement, QueryEvaluationException> performSearch(String queryText,
+ StatementConstraints contraints) throws QueryEvaluationException {
+ try {
+ WKTReader reader = new WKTReader();
+ Geometry geometry = reader.read(queryText);
+ CloseableIteration<Statement, QueryEvaluationException> statements = geoIndexer.queryWithin(
+ geometry, contraints);
+ return statements;
+ } catch (ParseException e) {
+ throw new QueryEvaluationException(e);
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "GEO_WITHIN";
+ };
+ };
+
+ {
+ SEARCH_FUNCTION_MAP.put(GeoConstants.GEO_SF_EQUALS, GEO_EQUALS);
+ SEARCH_FUNCTION_MAP.put(GeoConstants.GEO_SF_DISJOINT, GEO_DISJOINT);
+ SEARCH_FUNCTION_MAP.put(GeoConstants.GEO_SF_INTERSECTS, GEO_INTERSECTS);
+ SEARCH_FUNCTION_MAP.put(GeoConstants.GEO_SF_TOUCHES, GEO_TOUCHES);
+ SEARCH_FUNCTION_MAP.put(GeoConstants.GEO_SF_CONTAINS, GEO_CONTAINS);
+ SEARCH_FUNCTION_MAP.put(GeoConstants.GEO_SF_OVERLAPS, GEO_OVERLAPS);
+ SEARCH_FUNCTION_MAP.put(GeoConstants.GEO_SF_CROSSES, GEO_CROSSES);
+ SEARCH_FUNCTION_MAP.put(GeoConstants.GEO_SF_WITHIN, GEO_WITHIN);
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/9e76b8d7/extras/rya.geoindexing/geo.mongo/src/test/java/org/apache/rya/indexing/geotemporal/GeoTemporalProviderTest.java
----------------------------------------------------------------------
diff --git a/extras/rya.geoindexing/geo.mongo/src/test/java/org/apache/rya/indexing/geotemporal/GeoTemporalProviderTest.java b/extras/rya.geoindexing/geo.mongo/src/test/java/org/apache/rya/indexing/geotemporal/GeoTemporalProviderTest.java
new file mode 100644
index 0000000..7151b56
--- /dev/null
+++ b/extras/rya.geoindexing/geo.mongo/src/test/java/org/apache/rya/indexing/geotemporal/GeoTemporalProviderTest.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.geotemporal;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+
+import java.util.List;
+
+import org.apache.rya.indexing.GeoConstants;
+import org.apache.rya.indexing.TemporalInstantRfc3339;
+import org.apache.rya.indexing.external.matching.QuerySegment;
+import org.apache.rya.indexing.geotemporal.GeoTemporalIndexSetProvider;
+import org.apache.rya.indexing.geotemporal.model.EventQueryNode;
+import org.apache.rya.indexing.geotemporal.storage.EventStorage;
+import org.junit.Before;
+import org.junit.Test;
+import org.openrdf.model.URI;
+import org.openrdf.model.Value;
+import org.openrdf.model.ValueFactory;
+import org.openrdf.model.impl.ValueFactoryImpl;
+
+public class GeoTemporalProviderTest extends GeoTemporalTestBase {
+ private static final String URI_PROPERTY_AT_TIME = "Property:atTime";
+ private GeoTemporalIndexSetProvider provider;
+ private EventStorage events;
+ @Before
+ public void setup() {
+ events = mock(EventStorage.class);
+ provider = new GeoTemporalIndexSetProvider(events);
+ }
+
+ /*
+ * Simplest Happy Path test
+ */
+ @Test
+ public void twoPatternsTwoFilters_test() throws Exception {
+ final ValueFactory vf = new ValueFactoryImpl();
+ final Value geo = vf.createLiteral("Point(0 0)", GeoConstants.XMLSCHEMA_OGC_WKT);
+ final Value temp = vf.createLiteral(new TemporalInstantRfc3339(2015, 12, 30, 12, 00, 0).toString());
+ final URI tempPred = vf.createURI(URI_PROPERTY_AT_TIME);
+ final String query =
+ "PREFIX geo: <http://www.opengis.net/ont/geosparql#>" +
+ "PREFIX geos: <http://www.opengis.net/def/function/geosparql/>" +
+ "PREFIX time: <tag:rya-rdf.org,2015:temporal#>" +
+ "SELECT * WHERE { " +
+ "?subj <" + tempPred + "> ?time ."+
+ "?subj <" + GeoConstants.GEO_AS_WKT + "> ?loc . " +
+ " FILTER(geos:sfContains(?loc, " + geo + ")) . " +
+ " FILTER(time:equals(?time, " + temp + ")) . " +
+ "}";
+ final QuerySegment<EventQueryNode> node = getQueryNode(query);
+ final List<EventQueryNode> nodes = provider.getExternalSets(node);
+ assertEquals(1, nodes.size());
+ }
+
+ @Test
+ public void onePatternTwoFilters_test() throws Exception {
+ final ValueFactory vf = new ValueFactoryImpl();
+ final Value geo = vf.createLiteral("Point(0 0)", GeoConstants.XMLSCHEMA_OGC_WKT);
+ final Value temp = vf.createLiteral(new TemporalInstantRfc3339(2015, 12, 30, 12, 00, 0).toString());
+ final URI tempPred = vf.createURI(URI_PROPERTY_AT_TIME);
+ final String query =
+ "PREFIX geo: <http://www.opengis.net/ont/geosparql#>" +
+ "PREFIX geos: <http://www.opengis.net/def/function/geosparql/>" +
+ "PREFIX time: <tag:rya-rdf.org,2015:temporal#>" +
+ "SELECT * WHERE { " +
+ "?subj <" + tempPred + "> ?time ."+
+ " FILTER(geos:sfContains(?loc, " + geo + ")) . " +
+ " FILTER(time:equals(?time, " + temp + ")) . " +
+ "}";
+ final QuerySegment<EventQueryNode> node = getQueryNode(query);
+ final List<EventQueryNode> nodes = provider.getExternalSets(node);
+ assertEquals(0, nodes.size());
+ }
+
+ @Test
+ public void twoPatternsOneFilter_test() throws Exception {
+ final ValueFactory vf = new ValueFactoryImpl();
+ final Value geo = vf.createLiteral("Point(0 0)", GeoConstants.XMLSCHEMA_OGC_WKT);
+ final Value temp = vf.createLiteral(new TemporalInstantRfc3339(2015, 12, 30, 12, 00, 0).toString());
+ final URI tempPred = vf.createURI(URI_PROPERTY_AT_TIME);
+ final String query =
+ "PREFIX geo: <http://www.opengis.net/ont/geosparql#>" +
+ "PREFIX geos: <http://www.opengis.net/def/function/geosparql/>" +
+ "PREFIX time: <tag:rya-rdf.org,2015:temporal#>" +
+ "SELECT * WHERE { " +
+ "?subj <" + tempPred + "> ?time ."+
+ "?subj <" + GeoConstants.GEO_AS_WKT + "> ?loc . " +
+ " FILTER(geos:sfContains(?loc, " + geo + ")) . " +
+ "}";
+ final QuerySegment<EventQueryNode> node = getQueryNode(query);
+ final List<EventQueryNode> nodes = provider.getExternalSets(node);
+ assertEquals(0, nodes.size());
+ }
+
+ @Test
+ public void twoPatternsNoFilter_test() throws Exception {
+ final ValueFactory vf = new ValueFactoryImpl();
+ final URI tempPred = vf.createURI(URI_PROPERTY_AT_TIME);
+ final String query =
+ "PREFIX geo: <http://www.opengis.net/ont/geosparql#>" +
+ "PREFIX geos: <http://www.opengis.net/def/function/geosparql/>" +
+ "PREFIX time: <tag:rya-rdf.org,2015:temporal#>" +
+ "SELECT * WHERE { " +
+ "?subj <" + tempPred + "> ?time ."+
+ "?subj <" + GeoConstants.GEO_AS_WKT + "> ?loc . " +
+ "}";
+ final QuerySegment<EventQueryNode> node = getQueryNode(query);
+ final List<EventQueryNode> nodes = provider.getExternalSets(node);
+ assertEquals(0, nodes.size());
+ }
+
+ @Test
+ public void twoPatternsTwoFiltersNotValid_test() throws Exception {
+ final ValueFactory vf = new ValueFactoryImpl();
+ final Value geo = vf.createLiteral("Point(0 0)", GeoConstants.XMLSCHEMA_OGC_WKT);
+ final Value temp = vf.createLiteral(new TemporalInstantRfc3339(2015, 12, 30, 12, 00, 0).toString());
+ final URI tempPred = vf.createURI(URI_PROPERTY_AT_TIME);
+ //Only handles geo and temporal filters
+ final String query =
+ "PREFIX geo: <http://www.opengis.net/ont/geosparql#>" +
+ "PREFIX geos: <http://www.opengis.net/def/function/geosparql/>" +
+ "PREFIX text: <http://rdf.useekm.com/fts#text>" +
+ "SELECT * WHERE { " +
+ "?subj <" + tempPred + "> ?time ."+
+ "?subj <" + GeoConstants.GEO_AS_WKT + "> ?loc . " +
+ " FILTER(geos:sfContains(?loc, " + geo + ")) . " +
+ " FILTER(text:equals(?time, " + temp + ")) . " +
+ "}";
+ final QuerySegment<EventQueryNode> node = getQueryNode(query);
+ final List<EventQueryNode> nodes = provider.getExternalSets(node);
+ assertEquals(0, nodes.size());
+ }
+
+ @Test
+ public void twoSubjOneFilter_test() throws Exception {
+ final ValueFactory vf = new ValueFactoryImpl();
+ final Value geo = vf.createLiteral("Point(0 0)", GeoConstants.XMLSCHEMA_OGC_WKT);
+ final Value temp = vf.createLiteral(new TemporalInstantRfc3339(2015, 12, 30, 12, 00, 0).toString());
+ final URI tempPred = vf.createURI(URI_PROPERTY_AT_TIME);
+ final String query =
+ "PREFIX geo: <http://www.opengis.net/ont/geosparql#>" +
+ "PREFIX geos: <http://www.opengis.net/def/function/geosparql/>" +
+ "PREFIX time: <tag:rya-rdf.org,2015:temporal#>" +
+ "SELECT * WHERE { " +
+ "?subj <" + tempPred + "> ?time ."+
+ "?subj <" + GeoConstants.GEO_AS_WKT + "> ?loc . " +
+ "?subj2 <" + tempPred + "> ?time2 ."+
+ "?subj2 <" + GeoConstants.GEO_AS_WKT + "> ?loc2 . " +
+ " FILTER(geos:sfContains(?loc, " + geo + ")) . " +
+ " FILTER(time:equals(?time, " + temp + ")) . " +
+ "}";
+ final QuerySegment<EventQueryNode> node = getQueryNode(query);
+ final List<EventQueryNode> nodes = provider.getExternalSets(node);
+ assertEquals(1, nodes.size());
+ }
+
+ @Test
+ public void twoNode_test() throws Exception {
+ final ValueFactory vf = new ValueFactoryImpl();
+ final Value geo = vf.createLiteral("Point(0 0)", GeoConstants.XMLSCHEMA_OGC_WKT);
+ final Value temp = vf.createLiteral(new TemporalInstantRfc3339(2015, 12, 30, 12, 00, 0).toString());
+ final URI tempPred = vf.createURI(URI_PROPERTY_AT_TIME);
+ final String query =
+ "PREFIX geo: <http://www.opengis.net/ont/geosparql#>" +
+ "PREFIX geos: <http://www.opengis.net/def/function/geosparql/>" +
+ "PREFIX time: <tag:rya-rdf.org,2015:temporal#>" +
+ "SELECT * WHERE { " +
+ "?subj <" + tempPred + "> ?time ."+
+ "?subj <" + GeoConstants.GEO_AS_WKT + "> ?loc . " +
+ "?subj2 <" + tempPred + "> ?time2 ."+
+ "?subj2 <" + GeoConstants.GEO_AS_WKT + "> ?loc2 . " +
+ " FILTER(geos:sfContains(?loc, " + geo + ")) . " +
+ " FILTER(time:equals(?time, " + temp + ")) . " +
+ " FILTER(geos:sfContains(?loc2, " + geo + ")) . " +
+ " FILTER(time:equals(?time2, " + temp + ")) . " +
+ "}";
+ final QuerySegment<EventQueryNode> node = getQueryNode(query);
+ final List<EventQueryNode> nodes = provider.getExternalSets(node);
+ assertEquals(2, nodes.size());
+ }
+
+ @Test
+ public void twoSubjectMultiFilter_test() throws Exception {
+ final ValueFactory vf = new ValueFactoryImpl();
+ final Value geo = vf.createLiteral("Point(0 0)", GeoConstants.XMLSCHEMA_OGC_WKT);
+ final Value temp = vf.createLiteral(new TemporalInstantRfc3339(2015, 12, 30, 12, 00, 0).toString());
+ final URI tempPred = vf.createURI(URI_PROPERTY_AT_TIME);
+ final String query =
+ "PREFIX geo: <http://www.opengis.net/ont/geosparql#>" +
+ "PREFIX geos: <http://www.opengis.net/def/function/geosparql/>" +
+ "PREFIX time: <tag:rya-rdf.org,2015:temporal#>" +
+ "SELECT * WHERE { " +
+ "?subj <" + tempPred + "> ?time ."+
+ "?subj <" + GeoConstants.GEO_AS_WKT + "> ?loc . " +
+ " FILTER(geos:sfContains(?loc, " + geo + ")) . " +
+ " FILTER(time:equals(?time, " + temp + ")) . " +
+ " FILTER(geos:sfWithin(?loc, " + geo + ")) . " +
+ " FILTER(time:before(?time, " + temp + ")) . " +
+ "}";
+ final QuerySegment<EventQueryNode> node = getQueryNode(query);
+ final List<EventQueryNode> nodes = provider.getExternalSets(node);
+ assertEquals(1, nodes.size());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/9e76b8d7/extras/rya.geoindexing/geo.mongo/src/test/java/org/apache/rya/indexing/geotemporal/GeoTemporalTestBase.java
----------------------------------------------------------------------
diff --git a/extras/rya.geoindexing/geo.mongo/src/test/java/org/apache/rya/indexing/geotemporal/GeoTemporalTestBase.java b/extras/rya.geoindexing/geo.mongo/src/test/java/org/apache/rya/indexing/geotemporal/GeoTemporalTestBase.java
new file mode 100644
index 0000000..6b6bf15
--- /dev/null
+++ b/extras/rya.geoindexing/geo.mongo/src/test/java/org/apache/rya/indexing/geotemporal/GeoTemporalTestBase.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.geotemporal;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.rya.indexing.TemporalInstant;
+import org.apache.rya.indexing.TemporalInstantRfc3339;
+import org.apache.rya.indexing.external.matching.QuerySegment;
+import org.apache.rya.indexing.geotemporal.model.EventQueryNode;
+import org.junit.ComparisonFailure;
+import org.mockito.Mockito;
+import org.openrdf.query.algebra.FunctionCall;
+import org.openrdf.query.algebra.QueryModelNode;
+import org.openrdf.query.algebra.StatementPattern;
+import org.openrdf.query.algebra.helpers.QueryModelVisitorBase;
+import org.openrdf.query.algebra.helpers.StatementPatternCollector;
+import org.openrdf.query.parser.sparql.SPARQLParser;
+
+import com.vividsolutions.jts.geom.Coordinate;
+import com.vividsolutions.jts.geom.GeometryFactory;
+import com.vividsolutions.jts.geom.LineString;
+import com.vividsolutions.jts.geom.LinearRing;
+import com.vividsolutions.jts.geom.Point;
+import com.vividsolutions.jts.geom.Polygon;
+import com.vividsolutions.jts.geom.PrecisionModel;
+import com.vividsolutions.jts.geom.impl.PackedCoordinateSequence;
+
+public class GeoTemporalTestBase {
+ private static final GeometryFactory gf = new GeometryFactory(new PrecisionModel(), 4326);
+
+ /**
+ * Make an uniform instant with given seconds.
+ */
+ protected static TemporalInstant makeInstant(final int secondsMakeMeUnique) {
+ return new TemporalInstantRfc3339(2015, 12, 30, 12, 00, secondsMakeMeUnique);
+ }
+
+ protected static Polygon poly(final double[] arr) {
+ final LinearRing r1 = gf.createLinearRing(new PackedCoordinateSequence.Double(arr, 2));
+ final Polygon p1 = gf.createPolygon(r1, new LinearRing[] {});
+ return p1;
+ }
+
+ protected static Point point(final double x, final double y) {
+ return gf.createPoint(new Coordinate(x, y));
+ }
+
+ protected static LineString line(final double x1, final double y1, final double x2, final double y2) {
+ return new LineString(new PackedCoordinateSequence.Double(new double[] { x1, y1, x2, y2 }, 2), gf);
+ }
+
+ protected static double[] bbox(final double x1, final double y1, final double x2, final double y2) {
+ return new double[] { x1, y1, x1, y2, x2, y2, x2, y1, x1, y1 };
+ }
+
+ protected void assertEqualMongo(final Object expected, final Object actual) throws ComparisonFailure {
+ try {
+ assertEquals(expected, actual);
+ } catch(final Throwable e) {
+ throw new ComparisonFailure(e.getMessage(), expected.toString(), actual.toString());
+ }
+ }
+
+ public List<FunctionCall> getFilters(final String query) throws Exception {
+ final FunctionCallCollector collector = new FunctionCallCollector();
+ new SPARQLParser().parseQuery(query, null).getTupleExpr().visit(collector);
+ return collector.getTupleExpr();
+ }
+
+ public List<StatementPattern> getSps(final String query) throws Exception {
+ final StatementPatternCollector collector = new StatementPatternCollector();
+ new SPARQLParser().parseQuery(query, null).getTupleExpr().visit(collector);
+ return collector.getStatementPatterns();
+ }
+
+ public QuerySegment<EventQueryNode> getQueryNode(final String query) throws Exception {
+ final List<QueryModelNode> exprs = getNodes(query);
+ final QuerySegment<EventQueryNode> node = Mockito.mock(QuerySegment.class);
+ //provider only cares about Ordered nodes.
+ Mockito.when(node.getOrderedNodes()).thenReturn(exprs);
+ return node;
+ }
+
+ private static List<QueryModelNode> getNodes(final String sparql) throws Exception {
+ final NodeCollector collector = new NodeCollector();
+ new SPARQLParser().parseQuery(sparql, null).getTupleExpr().visit(collector);
+ return collector.getTupleExpr();
+ }
+
+ private static class NodeCollector extends QueryModelVisitorBase<RuntimeException> {
+ private final List<QueryModelNode> stPatterns = new ArrayList<>();
+
+ public List<QueryModelNode> getTupleExpr() {
+ return stPatterns;
+ }
+
+ @Override
+ public void meet(final FunctionCall node) {
+ stPatterns.add(node);
+ }
+
+ @Override
+ public void meet(final StatementPattern node) {
+ stPatterns.add(node);
+ }
+ }
+
+ private static class FunctionCallCollector extends QueryModelVisitorBase<RuntimeException> {
+ private final List<FunctionCall> filters = new ArrayList<>();
+
+ public List<FunctionCall> getTupleExpr() {
+ return filters;
+ }
+
+ @Override
+ public void meet(final FunctionCall node) {
+ filters.add(node);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/9e76b8d7/extras/rya.geoindexing/geo.mongo/src/test/java/org/apache/rya/indexing/geotemporal/MongoGeoTemporalIndexIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.geoindexing/geo.mongo/src/test/java/org/apache/rya/indexing/geotemporal/MongoGeoTemporalIndexIT.java b/extras/rya.geoindexing/geo.mongo/src/test/java/org/apache/rya/indexing/geotemporal/MongoGeoTemporalIndexIT.java
new file mode 100644
index 0000000..ff778ba
--- /dev/null
+++ b/extras/rya.geoindexing/geo.mongo/src/test/java/org/apache/rya/indexing/geotemporal/MongoGeoTemporalIndexIT.java
@@ -0,0 +1,176 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.geotemporal;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.rya.api.RdfCloudTripleStoreConfiguration;
+import org.apache.rya.api.domain.RyaURI;
+import org.apache.rya.indexing.GeoConstants;
+import org.apache.rya.indexing.GeoRyaSailFactory;
+import org.apache.rya.indexing.TemporalInstantRfc3339;
+import org.apache.rya.indexing.accumulo.ConfigUtils;
+import org.apache.rya.indexing.accumulo.geo.OptionalConfigUtils;
+import org.apache.rya.indexing.geotemporal.model.Event;
+import org.apache.rya.indexing.geotemporal.mongo.MongoGeoTemporalIndexer;
+import org.apache.rya.indexing.geotemporal.mongo.MongoITBase;
+import org.apache.rya.indexing.geotemporal.storage.EventStorage;
+import org.apache.rya.mongodb.MongoDBRdfConfiguration;
+import org.junit.Before;
+import org.junit.Test;
+import org.openrdf.model.URI;
+import org.openrdf.model.Value;
+import org.openrdf.model.ValueFactory;
+import org.openrdf.model.impl.ValueFactoryImpl;
+import org.openrdf.query.BindingSet;
+import org.openrdf.query.QueryLanguage;
+import org.openrdf.query.TupleQueryResult;
+import org.openrdf.query.impl.MapBindingSet;
+import org.openrdf.repository.sail.SailRepository;
+import org.openrdf.repository.sail.SailRepositoryConnection;
+import org.openrdf.sail.Sail;
+
+import com.mongodb.MongoClient;
+
+public class MongoGeoTemporalIndexIT extends MongoITBase {
+ private static final String URI_PROPERTY_AT_TIME = "Property:atTime";
+
+ private static final ValueFactory VF = ValueFactoryImpl.getInstance();
+ private MongoDBRdfConfiguration conf;
+ private SailRepositoryConnection conn;
+ private MongoClient mongoClient;
+ private static final AtomicInteger COUNTER = new AtomicInteger(1);
+
+ @Before
+ public void setUp() throws Exception{
+ mongoClient = super.getMongoClient();
+ conf = new MongoDBRdfConfiguration();
+ conf.set(MongoDBRdfConfiguration.MONGO_DB_NAME, MongoGeoTemporalIndexIT.class.getSimpleName() + "_" + COUNTER.getAndIncrement());
+ conf.set(MongoDBRdfConfiguration.MONGO_COLLECTION_PREFIX, "rya");
+ conf.set(RdfCloudTripleStoreConfiguration.CONF_TBL_PREFIX, "rya");
+ conf.setBoolean(ConfigUtils.USE_MONGO, true);
+ conf.setBoolean(OptionalConfigUtils.USE_GEOTEMPORAL, true);
+ conf.setMongoClient(mongoClient);
+
+ final Sail sail = GeoRyaSailFactory.getInstance(conf);
+ conn = new SailRepository(sail).getConnection();
+ conn.begin();
+
+ addStatements();
+ }
+
+ @Test
+ public void ensureInEventStore_Test() throws Exception {
+ final MongoGeoTemporalIndexer indexer = new MongoGeoTemporalIndexer();
+ indexer.initIndexer(conf, mongoClient);
+
+ final EventStorage events = indexer.getEventStorage(conf);
+ final RyaURI subject = new RyaURI("urn:event1");
+ final Optional<Event> event = events.get(subject);
+ assertTrue(event.isPresent());
+ }
+
+ @Test
+ public void constantSubjQuery_Test() throws Exception {
+ final String query =
+ "PREFIX time: <http://www.w3.org/2006/time#> \n"
+ + "PREFIX tempo: <tag:rya-rdf.org,2015:temporal#> \n"
+ + "PREFIX geo: <http://www.opengis.net/ont/geosparql#>"
+ + "PREFIX geof: <http://www.opengis.net/def/function/geosparql/>"
+ + "SELECT * "
+ + "WHERE { "
+ + " <urn:event1> time:atTime ?time . "
+ + " <urn:event1> geo:asWKT ?point . "
+ + " FILTER(geof:sfWithin(?point, \"POLYGON((-3 -2, -3 2, 1 2, 1 -2, -3 -2))\"^^geo:wktLiteral)) "
+ + " FILTER(tempo:equals(?time, \"2015-12-30T12:00:00Z\")) "
+ + "}";
+
+ final TupleQueryResult rez = conn.prepareTupleQuery(QueryLanguage.SPARQL, query).evaluate();
+ final Set<BindingSet> results = new HashSet<>();
+ while(rez.hasNext()) {
+ final BindingSet bs = rez.next();
+ results.add(bs);
+ }
+ final MapBindingSet expected = new MapBindingSet();
+ expected.addBinding("point", VF.createLiteral("POINT (0 0)"));
+ expected.addBinding("time", VF.createLiteral("2015-12-30T12:00:00Z"));
+
+ assertEquals(1, results.size());
+ assertEquals(expected, results.iterator().next());
+ }
+
+ @Test
+ public void variableSubjQuery_Test() throws Exception {
+ final String query =
+ "PREFIX time: <http://www.w3.org/2006/time#> \n"
+ + "PREFIX tempo: <tag:rya-rdf.org,2015:temporal#> \n"
+ + "PREFIX geo: <http://www.opengis.net/ont/geosparql#>"
+ + "PREFIX geof: <http://www.opengis.net/def/function/geosparql/>"
+ + "SELECT * "
+ + "WHERE { "
+ + " ?subj time:atTime ?time . "
+ + " ?subj geo:asWKT ?point . "
+ + " FILTER(geof:sfWithin(?point, \"POLYGON((-3 -2, -3 2, 1 2, 1 -2, -3 -2))\"^^geo:wktLiteral)) "
+ + " FILTER(tempo:equals(?time, \"2015-12-30T12:00:00Z\")) "
+ + "}";
+
+ final TupleQueryResult rez = conn.prepareTupleQuery(QueryLanguage.SPARQL, query).evaluate();
+ final List<BindingSet> results = new ArrayList<>();
+ while(rez.hasNext()) {
+ final BindingSet bs = rez.next();
+ results.add(bs);
+ }
+ final MapBindingSet expected1 = new MapBindingSet();
+ expected1.addBinding("point", VF.createLiteral("POINT (0 0)"));
+ expected1.addBinding("time", VF.createLiteral("2015-12-30T12:00:00Z"));
+
+ final MapBindingSet expected2 = new MapBindingSet();
+ expected2.addBinding("point", VF.createLiteral("POINT (1 1)"));
+ expected2.addBinding("time", VF.createLiteral("2015-12-30T12:00:00Z"));
+
+ assertEquals(2, results.size());
+ assertEquals(expected1, results.get(0));
+ assertEquals(expected2, results.get(1));
+ }
+
+ private void addStatements() throws Exception {
+ URI subject = VF.createURI("urn:event1");
+ final URI predicate = VF.createURI(URI_PROPERTY_AT_TIME);
+ Value object = VF.createLiteral(new TemporalInstantRfc3339(2015, 12, 30, 12, 00, 0).toString());
+ conn.add(VF.createStatement(subject, predicate, object));
+
+ object = VF.createLiteral("Point(0 0)", GeoConstants.XMLSCHEMA_OGC_WKT);
+ conn.add(VF.createStatement(subject, GeoConstants.GEO_AS_WKT, object));
+
+ subject = VF.createURI("urn:event2");
+ object = VF.createLiteral(new TemporalInstantRfc3339(2015, 12, 30, 12, 00, 0).toString());
+ conn.add(VF.createStatement(subject, predicate, object));
+
+ object = VF.createLiteral("Point(1 1)", GeoConstants.XMLSCHEMA_OGC_WKT);
+ conn.add(VF.createStatement(subject, GeoConstants.GEO_AS_WKT, object));
+ }
+}