You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rya.apache.org by mi...@apache.org on 2017/06/14 18:50:14 UTC
[2/5] incubator-rya git commit: RYA-237 Mongo GeoTemporal Indexer
RYA-237 Mongo GeoTemporal Indexer
Mongo support for the GeoTemporalIndexer
Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/63095d45
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/63095d45
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/63095d45
Branch: refs/heads/master
Commit: 63095d45a357da622343b42ed5ce22fd8217cc80
Parents: af5964a
Author: isper3at <sm...@gmail.com>
Authored: Wed Jan 18 15:38:11 2017 -0500
Committer: Aaron Mihalik <mi...@alum.mit.edu>
Committed: Wed Jun 14 13:27:56 2017 -0400
----------------------------------------------------------------------
.../mongo/EventDocumentConverter.java | 147 ++++++++++
.../geotemporal/mongo/EventUpdater.java | 85 ++++++
.../GeoTemporalMongoDBStorageStrategy.java | 293 +++++++++++++++++++
.../geotemporal/mongo/MongoEventStorage.java | 196 +++++++++++++
.../mongo/MongoGeoTemporalIndexer.java | 222 ++++++++++++++
5 files changed, 943 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63095d45/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/mongo/EventDocumentConverter.java
----------------------------------------------------------------------
diff --git a/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/mongo/EventDocumentConverter.java b/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/mongo/EventDocumentConverter.java
new file mode 100644
index 0000000..a41428e
--- /dev/null
+++ b/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/mongo/EventDocumentConverter.java
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.geotemporal.mongo;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Date;
+import java.util.List;
+
+import org.apache.rya.api.domain.RyaURI;
+import org.apache.rya.indexing.TemporalInstant;
+import org.apache.rya.indexing.TemporalInstantRfc3339;
+import org.apache.rya.indexing.TemporalInterval;
+import org.apache.rya.indexing.entity.storage.mongo.DocumentConverter;
+import org.apache.rya.indexing.geotemporal.model.Event;
+import org.apache.rya.indexing.mongodb.geo.GeoMongoDBStorageStrategy;
+import org.bson.Document;
+import org.bson.types.BasicBSONList;
+import org.joda.time.DateTime;
+
+import com.vividsolutions.jts.geom.Coordinate;
+import com.vividsolutions.jts.geom.Geometry;
+import com.vividsolutions.jts.geom.GeometryFactory;
+
+public class EventDocumentConverter implements DocumentConverter<Event>{
+ public static final String SUBJECT = "_id";
+ public static final String GEO_KEY = "location";
+ public static final String INTERVAL_START = "start";
+ public static final String INTERVAL_END = "end";
+ public static final String INSTANT = "instant";
+
+ private final GeoMongoDBStorageStrategy geoAdapter = new GeoMongoDBStorageStrategy(0);
+
+ @Override
+ public Document toDocument(final Event event) {
+ requireNonNull(event);
+
+ final Document doc = new Document();
+ doc.append(SUBJECT, event.getSubject().getData());
+
+ if(event.getGeometry().isPresent()) {
+ final BasicBSONList points = new BasicBSONList();
+ for(final double[] point : geoAdapter.getCorrespondingPoints(event.getGeometry().get())) {
+ final BasicBSONList pointDoc = new BasicBSONList();
+ for(final double p : point) {
+ pointDoc.add(p);
+ }
+ points.add(pointDoc);
+ }
+
+ doc.append(GEO_KEY, points);
+ }
+ if(event.isInstant()) {
+ if(event.getInstant().isPresent()) {
+ doc.append(INSTANT, event.getInstant().get().getAsDateTime().toDate());
+ }
+ } else {
+ if(event.getInterval().isPresent()) {
+ doc.append(INTERVAL_START, event.getInterval().get().getHasBeginning().getAsDateTime().toDate());
+ doc.append(INTERVAL_END, event.getInterval().get().getHasEnd().getAsDateTime().toDate());
+ }
+ }
+
+ return doc;
+ }
+
+ @Override
+ public Event fromDocument(final Document document) throws DocumentConverterException {
+ requireNonNull(document);
+
+ final boolean isInstant;
+
+ // Preconditions.
+ if(!document.containsKey(SUBJECT)) {
+ throw new DocumentConverterException("Could not convert document '" + document +
+ "' because its '" + SUBJECT + "' field is missing.");
+ }
+
+ if(document.containsKey(INSTANT)) {
+ isInstant = true;
+ } else {
+ isInstant = false;
+ }
+
+ final String subject = document.getString(SUBJECT);
+
+ final Event.Builder builder = new Event.Builder()
+ .setSubject(new RyaURI(subject));
+
+ if(document.containsKey(GEO_KEY)) {
+ final List<List<Double>> pointsList = (List<List<Double>>) document.get(GEO_KEY);
+ final Coordinate[] coords = new Coordinate[pointsList.size()];
+
+ int ii = 0;
+ for(final List<Double> point : pointsList) {
+ coords[ii] = new Coordinate(point.get(0), point.get(1));
+ ii++;
+ }
+
+ final GeometryFactory geoFact = new GeometryFactory();
+ final Geometry geo;
+ if(coords.length == 1) {
+ geo = geoFact.createPoint(coords[0]);
+ } else {
+ geo = geoFact.createPolygon(coords);
+ }
+ builder.setGeometry(geo);
+ }
+
+ if(isInstant) {
+ //we already know the key exists
+ final Date date = (Date) document.get(INSTANT);
+ final DateTime dt = new DateTime(date.getTime());
+ final TemporalInstant instant = new TemporalInstantRfc3339(dt);
+ builder.setTemporalInstant(instant);
+ } else if(document.containsKey(INTERVAL_START)){
+ Date date = (Date) document.get(INTERVAL_START);
+ DateTime dt = new DateTime(date.getTime());
+ final TemporalInstant begining = new TemporalInstantRfc3339(dt);
+
+ date = (Date) document.get(INTERVAL_END);
+ dt = new DateTime(date.getTime());
+ final TemporalInstant end = new TemporalInstantRfc3339(dt);
+
+ final TemporalInterval interval = new TemporalInterval(begining, end);
+ builder.setTemporalInterval(interval);
+ }
+ return builder.build();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63095d45/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/mongo/EventUpdater.java
----------------------------------------------------------------------
diff --git a/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/mongo/EventUpdater.java b/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/mongo/EventUpdater.java
new file mode 100644
index 0000000..c9f4658
--- /dev/null
+++ b/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/mongo/EventUpdater.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.geotemporal.mongo;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Optional;
+
+import org.apache.rya.api.domain.RyaURI;
+import org.apache.rya.indexing.geotemporal.model.Event;
+import org.apache.rya.indexing.geotemporal.storage.EventStorage;
+import org.apache.rya.indexing.geotemporal.storage.EventStorage.EventStorageException;
+import org.apache.rya.indexing.mongodb.update.DocumentUpdater;
+import org.apache.rya.indexing.mongodb.update.RyaObjectStorage.ObjectStorageException;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * Performs update operations over an {@link EventStorage}.
+ */
+@DefaultAnnotation(NonNull.class)
+public class EventUpdater implements DocumentUpdater<RyaURI, Event>{
+ private final EventStorage events;
+
+ /**
+ * Constructs an instance of {@link EventUpdater}
+ *
+ * @param events - The storage this updater operates over. (not null)
+ */
+ public EventUpdater(final EventStorage events) {
+ this.events = requireNonNull(events);
+ }
+
+ @Override
+ public Optional<Event> getOld(final RyaURI key) throws EventStorageException {
+ try {
+ return events.get(key);
+ } catch (final ObjectStorageException e) {
+ throw new EventStorageException(e.getMessage(), e);
+ }
+ }
+
+ @Override
+ public void create(final Event newObj) throws EventStorageException {
+ try {
+ events.create(newObj);
+ } catch (final ObjectStorageException e) {
+ throw new EventStorageException(e.getMessage(), e);
+ }
+ }
+
+ @Override
+ public void update(final Event old, final Event updated) throws EventStorageException {
+ try {
+ events.update(old, updated);
+ } catch (final ObjectStorageException e) {
+ throw new EventStorageException(e.getMessage(), e);
+ }
+ }
+
+ public void delete(final Event event) throws EventStorageException {
+ try {
+ events.delete(event.getSubject());
+ } catch (final ObjectStorageException e) {
+ throw new EventStorageException(e.getMessage(), e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63095d45/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/mongo/GeoTemporalMongoDBStorageStrategy.java
----------------------------------------------------------------------
diff --git a/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/mongo/GeoTemporalMongoDBStorageStrategy.java b/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/mongo/GeoTemporalMongoDBStorageStrategy.java
new file mode 100644
index 0000000..352dcb6
--- /dev/null
+++ b/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/mongo/GeoTemporalMongoDBStorageStrategy.java
@@ -0,0 +1,293 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.geotemporal.mongo;
+
+import static org.apache.rya.indexing.mongodb.geo.GeoMongoDBStorageStrategy.GeoQueryType.EQUALS;
+import static org.apache.rya.indexing.mongodb.geo.GeoMongoDBStorageStrategy.GeoQueryType.INTERSECTS;
+import static org.apache.rya.indexing.mongodb.geo.GeoMongoDBStorageStrategy.GeoQueryType.WITHIN;
+import static org.apache.rya.indexing.mongodb.temporal.TemporalMongoDBStorageStrategy.INSTANT;
+import static org.apache.rya.indexing.mongodb.temporal.TemporalMongoDBStorageStrategy.INTERVAL_END;
+import static org.apache.rya.indexing.mongodb.temporal.TemporalMongoDBStorageStrategy.INTERVAL_START;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.regex.Matcher;
+
+import org.apache.log4j.Logger;
+import org.apache.rya.api.domain.RyaStatement;
+import org.apache.rya.api.resolver.RyaToRdfConversions;
+import org.apache.rya.indexing.GeoConstants;
+import org.apache.rya.indexing.IndexingExpr;
+import org.apache.rya.indexing.TemporalInstant;
+import org.apache.rya.indexing.TemporalInstantRfc3339;
+import org.apache.rya.indexing.TemporalInterval;
+import org.apache.rya.indexing.accumulo.geo.GeoParseUtils;
+import org.apache.rya.indexing.geotemporal.GeoTemporalIndexException;
+import org.apache.rya.indexing.geotemporal.GeoTemporalIndexer.GeoPolicy;
+import org.apache.rya.indexing.geotemporal.GeoTemporalIndexer.TemporalPolicy;
+import org.apache.rya.indexing.mongodb.IndexingMongoDBStorageStrategy;
+import org.apache.rya.indexing.mongodb.geo.GeoMongoDBStorageStrategy;
+import org.apache.rya.indexing.mongodb.geo.GeoMongoDBStorageStrategy.GeoQuery;
+import org.apache.rya.indexing.mongodb.temporal.TemporalMongoDBStorageStrategy;
+import org.joda.time.DateTime;
+import org.openrdf.model.Statement;
+import org.openrdf.model.URI;
+import org.openrdf.query.MalformedQueryException;
+
+import com.mongodb.BasicDBObject;
+import com.mongodb.BasicDBObjectBuilder;
+import com.mongodb.DBCollection;
+import com.mongodb.DBObject;
+import com.mongodb.QueryBuilder;
+import com.vividsolutions.jts.geom.Geometry;
+import com.vividsolutions.jts.io.ParseException;
+import com.vividsolutions.jts.io.WKTReader;
+
+import jline.internal.Log;
+
+/**
+ * TODO: doc
+ */
+public class GeoTemporalMongoDBStorageStrategy extends IndexingMongoDBStorageStrategy {
+ private static final Logger LOG = Logger.getLogger(GeoTemporalMongoDBStorageStrategy.class);
+ private static final String GEO_KEY = "location";
+ private static final String TIME_KEY = "time";
+ private final TemporalMongoDBStorageStrategy temporalStrategy;
+ private final GeoMongoDBStorageStrategy geoStrategy;
+
+ public GeoTemporalMongoDBStorageStrategy() {
+ geoStrategy = new GeoMongoDBStorageStrategy(0);
+ temporalStrategy = new TemporalMongoDBStorageStrategy();
+ }
+
+ @Override
+ public void createIndices(final DBCollection coll){
+ coll.createIndex(GEO_KEY);
+ coll.createIndex(TIME_KEY);
+ }
+
+ public DBObject getFilterQuery(final Collection<IndexingExpr> geoFilters, final Collection<IndexingExpr> temporalFilters) throws GeoTemporalIndexException {
+ final QueryBuilder builder = QueryBuilder.start();
+
+ if(!geoFilters.isEmpty()) {
+ final DBObject[] geo = getGeoObjs(geoFilters);
+ if(!temporalFilters.isEmpty()) {
+ final DBObject[] temporal = getTemporalObjs(temporalFilters);
+ builder.and(oneOrAnd(geo), oneOrAnd(temporal));
+ return builder.get();
+ } else {
+ return oneOrAnd(geo);
+ }
+ } else if(!temporalFilters.isEmpty()) {
+ final DBObject[] temporal = getTemporalObjs(temporalFilters);
+ return oneOrAnd(temporal);
+ } else {
+ return builder.get();
+ }
+ }
+
+ private DBObject oneOrAnd(final DBObject[] dbos) {
+ if(dbos.length == 1) {
+ return dbos[0];
+ }
+ return QueryBuilder.start()
+ .and(dbos)
+ .get();
+ }
+
+ @Override
+ public DBObject serialize(final RyaStatement ryaStatement) {
+ final BasicDBObjectBuilder builder = BasicDBObjectBuilder.start("_id", ryaStatement.getSubject().hashCode());
+ final URI obj = ryaStatement.getObject().getDataType();
+
+
+ if(obj.equals(GeoConstants.GEO_AS_WKT) || obj.equals(GeoConstants.GEO_AS_GML) ||
+ obj.equals(GeoConstants.XMLSCHEMA_OGC_GML) || obj.equals(GeoConstants.XMLSCHEMA_OGC_WKT)) {
+ try {
+ final Statement statement = RyaToRdfConversions.convertStatement(ryaStatement);
+ final Geometry geo = GeoParseUtils.getGeometry(statement);
+ builder.add(GEO_KEY, geoStrategy.getCorrespondingPoints(geo));
+ } catch (final ParseException e) {
+ LOG.error("Could not create geometry for statement " + ryaStatement, e);
+ return null;
+ }
+ } else {
+ builder.add(TIME_KEY, temporalStrategy.getTimeValue(ryaStatement.getObject().getData()));
+ }
+ return builder.get();
+ }
+
+ private DBObject[] getGeoObjs(final Collection<IndexingExpr> geoFilters) {
+ final List<DBObject> objs = new ArrayList<>();
+ geoFilters.forEach(filter -> {
+ final GeoPolicy policy = GeoPolicy.fromURI(filter.getFunction());
+ final WKTReader reader = new WKTReader();
+ final String geoStr = filter.getArguments()[0].stringValue();
+ try {
+ //This method is what is used in the GeoIndexer.
+ final Geometry geo = reader.read(geoStr);
+ objs.add(getGeoObject(geo, policy));
+ } catch (final GeoTemporalIndexException | UnsupportedOperationException | ParseException e) {
+ Log.error("Unable to parse '" + geoStr + "'.", e);
+ }
+ });
+ return objs.toArray(new DBObject[]{});
+ }
+
+ private DBObject[] getTemporalObjs(final Collection<IndexingExpr> temporalFilters) {
+ final List<DBObject> objs = new ArrayList<>();
+ temporalFilters.forEach(filter -> {
+ final TemporalPolicy policy = TemporalPolicy.fromURI(filter.getFunction());
+ final String timeStr = filter.getArguments()[0].stringValue();
+ final Matcher matcher = TemporalInstantRfc3339.PATTERN.matcher(timeStr);
+ if(matcher.find()) {
+ final TemporalInterval interval = TemporalInstantRfc3339.parseInterval(timeStr);
+ if(policy == TemporalPolicy.INSTANT_AFTER_INSTANT ||
+ policy == TemporalPolicy.INSTANT_BEFORE_INSTANT ||
+ policy == TemporalPolicy.INSTANT_EQUALS_INSTANT) {
+ if(interval == null) {
+ Log.error("Cannot perform temporal interval based queries on an instant.");
+ }
+ }
+ objs.add(getTemporalObject(interval, policy));
+ } else {
+ final TemporalInstant instant = new TemporalInstantRfc3339(DateTime.parse(timeStr));
+ if(policy != TemporalPolicy.INSTANT_AFTER_INSTANT &&
+ policy != TemporalPolicy.INSTANT_BEFORE_INSTANT &&
+ policy != TemporalPolicy.INSTANT_EQUALS_INSTANT) {
+ Log.error("Cannot perform temporal instant based queries on an interval.");
+ }
+ objs.add(getTemporalObject(instant, policy));
+ }
+ });
+ return objs.toArray(new DBObject[]{});
+ }
+
+ private DBObject getGeoObject (final Geometry geo, final GeoPolicy policy) throws GeoTemporalIndexException {
+ switch(policy) {
+ case CONTAINS:
+ throw new UnsupportedOperationException("Contains queries are not supported in Mongo DB.");
+ case CROSSES:
+ throw new UnsupportedOperationException("Crosses queries are not supported in Mongo DB.");
+ case DISJOINT:
+ throw new UnsupportedOperationException("Disjoint queries are not supported in Mongo DB.");
+ case EQUALS:
+ try {
+ return geoStrategy.getQuery(new GeoQuery(EQUALS, geo));
+ } catch (final MalformedQueryException e) {
+ throw new GeoTemporalIndexException(e.getMessage(), e);
+ }
+ case INTERSECTS:
+ try {
+ return geoStrategy.getQuery(new GeoQuery(INTERSECTS, geo));
+ } catch (final MalformedQueryException e) {
+ throw new GeoTemporalIndexException(e.getMessage(), e);
+ }
+ case OVERLAPS:
+ throw new UnsupportedOperationException("Overlaps queries are not supported in Mongo DB.");
+ case TOUCHES:
+ throw new UnsupportedOperationException("Touches queries are not supported in Mongo DB.");
+ case WITHIN:
+ try {
+ return geoStrategy.getQuery(new GeoQuery(WITHIN, geo));
+ } catch (final MalformedQueryException e) {
+ throw new GeoTemporalIndexException(e.getMessage(), e);
+ }
+ default:
+ return new BasicDBObject();
+ }
+ }
+
+ private DBObject getTemporalObject(final TemporalInstant instant, final TemporalPolicy policy) {
+ final DBObject temporalObj;
+ switch(policy) {
+ case INSTANT_AFTER_INSTANT:
+ temporalObj = QueryBuilder.start(INSTANT)
+ .greaterThan(instant.getAsDateTime().toDate())
+ .get();
+ break;
+ case INSTANT_BEFORE_INSTANT:
+ temporalObj = QueryBuilder.start(INSTANT)
+ .lessThan(instant.getAsDateTime().toDate())
+ .get();
+ break;
+ case INSTANT_EQUALS_INSTANT:
+ temporalObj = QueryBuilder.start(INSTANT)
+ .is(instant.getAsDateTime().toDate())
+ .get();
+ break;
+ default:
+ temporalObj = new BasicDBObject();
+ }
+ return temporalObj;
+ }
+
+ private DBObject getTemporalObject(final TemporalInterval interval, final TemporalPolicy policy) {
+ final DBObject temporalObj;
+ switch(policy) {
+ case INSTANT_AFTER_INTERVAL:
+ temporalObj = QueryBuilder.start(INSTANT)
+ .greaterThan(interval.getHasEnd().getAsDateTime().toDate())
+ .get();
+ break;
+ case INSTANT_BEFORE_INTERVAL:
+ temporalObj = QueryBuilder.start(INSTANT)
+ .lessThan(interval.getHasBeginning().getAsDateTime().toDate())
+ .get();
+ break;
+ case INSTANT_END_INTERVAL:
+ temporalObj = QueryBuilder.start(INSTANT)
+ .is(interval.getHasEnd().getAsDateTime().toDate())
+ .get();
+ break;
+ case INSTANT_IN_INTERVAL:
+ temporalObj = QueryBuilder.start(INSTANT)
+ .greaterThan(interval.getHasBeginning().getAsDateTime().toDate())
+ .lessThan(interval.getHasEnd().getAsDateTime().toDate())
+ .get();
+ break;
+ case INSTANT_START_INTERVAL:
+ temporalObj = QueryBuilder.start(INSTANT)
+ .is(interval.getHasBeginning().getAsDateTime().toDate())
+ .get();
+ break;
+ case INTERVAL_AFTER:
+ temporalObj = QueryBuilder.start(INTERVAL_START)
+ .greaterThan(interval.getHasEnd().getAsDateTime().toDate())
+ .get();
+ break;
+ case INTERVAL_BEFORE:
+ temporalObj = QueryBuilder.start(INTERVAL_END)
+ .lessThan(interval.getHasBeginning().getAsDateTime().toDate())
+ .get();
+ break;
+ case INTERVAL_EQUALS:
+ temporalObj = QueryBuilder.start(INTERVAL_START)
+ .is(interval.getHasBeginning().getAsDateTime().toDate())
+ .and(INTERVAL_END)
+ .is(interval.getHasEnd().getAsDateTime().toDate())
+ .get();
+ break;
+ default:
+ temporalObj = new BasicDBObject();
+ }
+ return temporalObj;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63095d45/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/mongo/MongoEventStorage.java
----------------------------------------------------------------------
diff --git a/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/mongo/MongoEventStorage.java b/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/mongo/MongoEventStorage.java
new file mode 100644
index 0000000..8ddf075
--- /dev/null
+++ b/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/mongo/MongoEventStorage.java
@@ -0,0 +1,196 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.geotemporal.mongo;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+
+import org.apache.rya.api.domain.RyaURI;
+import org.apache.rya.indexing.IndexingExpr;
+import org.apache.rya.indexing.entity.model.TypedEntity;
+import org.apache.rya.indexing.entity.storage.mongo.DocumentConverter.DocumentConverterException;
+import org.apache.rya.indexing.geotemporal.GeoTemporalIndexException;
+import org.apache.rya.indexing.geotemporal.model.Event;
+import org.apache.rya.indexing.geotemporal.storage.EventStorage;
+import org.apache.rya.indexing.entity.storage.mongo.MongoEntityStorage;
+import org.bson.BsonDocument;
+import org.bson.BsonString;
+import org.bson.Document;
+import org.bson.conversions.Bson;
+
+import com.mongodb.BasicDBObjectBuilder;
+import com.mongodb.DBObject;
+import com.mongodb.ErrorCategory;
+import com.mongodb.MongoClient;
+import com.mongodb.MongoException;
+import com.mongodb.client.MongoCollection;
+import com.mongodb.client.MongoCursor;
+import com.mongodb.client.model.Filters;
+
+public class MongoEventStorage implements EventStorage {
+
+ protected static final String COLLECTION_NAME = "geotemporal-events";
+
+ private static final EventDocumentConverter EVENT_CONVERTER = new EventDocumentConverter();
+
+ /**
+ * A client connected to the Mongo instance that hosts the Rya instance.
+ */
+ protected final MongoClient mongo;
+
+ /**
+ * The name of the Rya instance the {@link TypedEntity}s are for.
+ */
+ protected final String ryaInstanceName;
+
+ /*
+ * Used to get the filter query objects.
+ */
+ private final GeoTemporalMongoDBStorageStrategy queryAdapter;
+
+ /**
+ * Constructs an instance of {@link MongoEntityStorage}.
+ *
+ * @param mongo - A client connected to the Mongo instance that hosts the Rya instance. (not null)
+ * @param ryaInstanceName - The name of the Rya instance the {@link TypedEntity}s are for. (not null)
+ */
+ public MongoEventStorage(final MongoClient mongo, final String ryaInstanceName) {
+ this.mongo = requireNonNull(mongo);
+ this.ryaInstanceName = requireNonNull(ryaInstanceName);
+ queryAdapter = new GeoTemporalMongoDBStorageStrategy();
+ }
+
+ @Override
+ public void create(final Event event) throws EventStorageException {
+ requireNonNull(event);
+
+ try {
+ mongo.getDatabase(ryaInstanceName)
+ .getCollection(COLLECTION_NAME)
+ .insertOne(EVENT_CONVERTER.toDocument(event));
+ } catch(final MongoException e) {
+ final ErrorCategory category = ErrorCategory.fromErrorCode( e.getCode() );
+ if(category == ErrorCategory.DUPLICATE_KEY) {
+ throw new EventAlreadyExistsException("Failed to create Event with Subject '" + event.getSubject().getData() + "'.", e);
+ }
+ throw new EventStorageException("Failed to create Event with Subject '" + event.getSubject().getData() + "'.", e);
+ }
+ }
+
+ @Override
+ public Optional<Event> get(final RyaURI subject) throws EventStorageException {
+ requireNonNull(subject);
+
+ try {
+ final Document document = mongo.getDatabase(ryaInstanceName)
+ .getCollection(COLLECTION_NAME)
+ .find( new BsonDocument(EventDocumentConverter.SUBJECT, new BsonString(subject.getData())) )
+ .first();
+
+ return document == null ?
+ Optional.empty() :
+ Optional.of( EVENT_CONVERTER.fromDocument(document) );
+
+ } catch(final MongoException | DocumentConverterException e) {
+ throw new EventStorageException("Could not get the Event with Subject '" + subject.getData() + "'.", e);
+ }
+ }
+
+ @Override
+ public Collection<Event> search(final Optional<RyaURI> subject, final Optional<Collection<IndexingExpr>> geoFilters, final Optional<Collection<IndexingExpr>> temporalFilters) throws EventStorageException {
+ requireNonNull(subject);
+
+ try {
+ final Collection<IndexingExpr> geos = (geoFilters.isPresent() ? geoFilters.get() : new ArrayList<>());
+ final Collection<IndexingExpr> tempos = (temporalFilters.isPresent() ? temporalFilters.get() : new ArrayList<>());
+ final DBObject filterObj = queryAdapter.getFilterQuery(geos, tempos);
+
+ final BasicDBObjectBuilder builder = BasicDBObjectBuilder
+ .start(filterObj.toMap());
+ if(subject.isPresent()) {
+ builder.append(EventDocumentConverter.SUBJECT, subject.get().getData());
+ }
+ final MongoCursor<Document> results = mongo.getDatabase(ryaInstanceName)
+ .getCollection(COLLECTION_NAME)
+ .find( BsonDocument.parse(builder.get().toString()) )
+ .iterator();
+
+ final List<Event> events = new ArrayList<>();
+ final EventDocumentConverter adapter = new EventDocumentConverter();
+ while(results.hasNext()) {
+ events.add(adapter.fromDocument(results.next()));
+ }
+ return events;
+ } catch(final MongoException | DocumentConverterException | GeoTemporalIndexException e) {
+ throw new EventStorageException("Could not get the Event.", e);
+ }
+ }
+
+ @Override
+ public void update(final Event old, final Event updated) throws StaleUpdateException, EventStorageException {
+ requireNonNull(old);
+ requireNonNull(updated);
+
+ // The updated entity must have the same Subject as the one it is replacing.
+ if(!old.getSubject().equals(updated.getSubject())) {
+ throw new EventStorageException("The old Event and the updated Event must have the same Subject. " +
+ "Old Subject: " + old.getSubject().getData() + ", Updated Subject: " + updated.getSubject().getData());
+ }
+
+ final Set<Bson> filters = new HashSet<>();
+
+ // Must match the old entity's Subject.
+ filters.add( makeSubjectFilter(old.getSubject()) );
+
+ // Do a find and replace.
+ final Bson oldEntityFilter = Filters.and(filters);
+ final Document updatedDoc = EVENT_CONVERTER.toDocument(updated);
+
+ final MongoCollection<Document> collection = mongo.getDatabase(ryaInstanceName).getCollection(COLLECTION_NAME);
+ if(collection.findOneAndReplace(oldEntityFilter, updatedDoc) == null) {
+ throw new StaleUpdateException("Could not update the Event with Subject '" + updated.getSubject().getData() + ".");
+ }
+ }
+
+ @Override
+ public boolean delete(final RyaURI subject) throws EventStorageException {
+ requireNonNull(subject);
+
+ try {
+ final Document deleted = mongo.getDatabase(ryaInstanceName)
+ .getCollection(COLLECTION_NAME)
+ .findOneAndDelete( makeSubjectFilter(subject) );
+
+ return deleted != null;
+
+ } catch(final MongoException e) {
+ throw new EventStorageException("Could not delete the Event with Subject '" + subject.getData() + "'.", e);
+ }
+ }
+
+ private static Bson makeSubjectFilter(final RyaURI subject) {
+ return Filters.eq(EventDocumentConverter.SUBJECT, subject.getData());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63095d45/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/mongo/MongoGeoTemporalIndexer.java
----------------------------------------------------------------------
diff --git a/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/mongo/MongoGeoTemporalIndexer.java b/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/mongo/MongoGeoTemporalIndexer.java
new file mode 100644
index 0000000..1baab18
--- /dev/null
+++ b/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/mongo/MongoGeoTemporalIndexer.java
@@ -0,0 +1,222 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.geotemporal.mongo;
+
+import static com.google.common.base.Preconditions.checkState;
+import static java.util.Objects.requireNonNull;
+
+import java.io.IOException;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.regex.Matcher;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.log4j.Logger;
+import org.apache.rya.api.domain.RyaStatement;
+import org.apache.rya.api.domain.RyaURI;
+import org.apache.rya.api.resolver.RyaToRdfConversions;
+import org.apache.rya.indexing.GeoConstants;
+import org.apache.rya.indexing.TemporalInstant;
+import org.apache.rya.indexing.TemporalInstantRfc3339;
+import org.apache.rya.indexing.TemporalInterval;
+import org.apache.rya.indexing.accumulo.ConfigUtils;
+import org.apache.rya.indexing.accumulo.geo.GeoParseUtils;
+import org.apache.rya.indexing.geotemporal.GeoTemporalIndexer;
+import org.apache.rya.indexing.geotemporal.model.Event;
+import org.apache.rya.indexing.geotemporal.storage.EventStorage;
+import org.apache.rya.indexing.mongodb.AbstractMongoIndexer;
+import org.apache.rya.indexing.mongodb.IndexingException;
+import org.apache.rya.mongodb.MongoConnectorFactory;
+import org.apache.rya.mongodb.MongoDBRdfConfiguration;
+import org.joda.time.DateTime;
+import org.openrdf.model.Statement;
+import org.openrdf.model.URI;
+
+import com.vividsolutions.jts.geom.Geometry;
+import com.vividsolutions.jts.io.ParseException;
+
+/**
+ * Indexer that stores 2 separate statements as one 'Event' entity.
+ * <p>
+ * The statements are required to have the same subject, one must be
+ * a Geo based statement and the other a temporal based statement.
+ * <p>
+ * This indexer is later used when querying for geo/temporal statements
+ * in the format of:
+ * <pre>
+ * {@code
+ * QUERY PARAMS
+ * ?SomeSubject geo:predicate ?Location
+ * ?SomeSubject time:predicate ?Time
+ * Filter(?Location, geoFunction())
+ * Filter(?Time, temporalFunction())
+ * }
+ *
+ * The number of filters is not strict, but there must be at least one
+ * query pattern for geo and one for temporal as well as at least one
+ * filter for each type.
+ */
+public class MongoGeoTemporalIndexer extends AbstractMongoIndexer<GeoTemporalMongoDBStorageStrategy> implements GeoTemporalIndexer {
+ private static final Logger LOG = Logger.getLogger(MongoGeoTemporalIndexer.class);
+ public static final String GEO_TEMPORAL_COLLECTION = "geo_temporal";
+
+ private final AtomicReference<MongoDBRdfConfiguration> configuration = new AtomicReference<>();
+ private final AtomicReference<EventStorage> events = new AtomicReference<>();
+
+ @Override
+ public void init() {
+ initCore();
+ predicates = ConfigUtils.getGeoPredicates(conf);
+ predicates.addAll(ConfigUtils.getTemporalPredicates(conf));
+ storageStrategy = new GeoTemporalMongoDBStorageStrategy();
+ }
+
+ @Override
+ public void setConf(final Configuration conf) {
+ requireNonNull(conf);
+ events.set(null);
+ events.set(getEventStorage(conf));
+ super.conf = conf;
+ configuration.set(new MongoDBRdfConfiguration(conf));
+ }
+
+ @Override
+ public void storeStatement(final RyaStatement ryaStatement) throws IOException {
+ requireNonNull(ryaStatement);
+
+ try {
+ updateEvent(ryaStatement.getSubject(), ryaStatement);
+ } catch (IndexingException | ParseException e) {
+ throw new IOException("Failed to update the Entity index.", e);
+ }
+ }
+
+ @Override
+ public void deleteStatement(final RyaStatement statement) throws IOException {
+ requireNonNull(statement);
+ final RyaURI subject = statement.getSubject();
+ try {
+ final EventStorage eventStore = events.get();
+ checkState(events != null, "Must set this indexers configuration before storing statements.");
+
+ new EventUpdater(eventStore).update(subject, old -> {
+ final Event.Builder updated;
+ if(!old.isPresent()) {
+ return Optional.empty();
+ } else {
+ updated = Event.builder(old.get());
+ }
+
+ final Event currentEvent = updated.build();
+ final URI pred = statement.getObject().getDataType();
+ if((pred.equals(GeoConstants.GEO_AS_WKT) || pred.equals(GeoConstants.GEO_AS_GML) ||
+ pred.equals(GeoConstants.XMLSCHEMA_OGC_WKT) || pred.equals(GeoConstants.XMLSCHEMA_OGC_GML))
+ && currentEvent.getGeometry().isPresent()) {
+ //is geo and needs to be removed.
+ try {
+ if(currentEvent.getGeometry().get().equals(GeoParseUtils.getGeometry(RyaToRdfConversions.convertStatement(statement)))) {
+ updated.setGeometry(null);
+ }
+ } catch (final Exception e) {
+ LOG.debug("Unable to parse the stored geometry.");
+ }
+ } else {
+ //is time
+ final String dateTime = statement.getObject().getData();
+ final Matcher matcher = TemporalInstantRfc3339.PATTERN.matcher(dateTime);
+ if (matcher.find()) {
+ final TemporalInterval interval = TemporalInstantRfc3339.parseInterval(dateTime);
+ if(currentEvent.getInterval().get().equals(interval)) {
+ updated.setTemporalInterval(null);
+ }
+ } else {
+ final TemporalInstant instant = new TemporalInstantRfc3339(DateTime.parse(dateTime));
+ if(currentEvent.getInstant().get().equals(instant)) {
+ updated.setTemporalInstant(null);
+ }
+ }
+ }
+ return Optional.of(updated.build());
+ });
+ } catch (final IndexingException e) {
+ throw new IOException("Failed to update the Entity index.", e);
+ }
+ }
+
+ private void updateEvent(final RyaURI subject, final RyaStatement statement) throws IndexingException, ParseException {
+ final EventStorage eventStore = events.get();
+ checkState(events != null, "Must set this indexers configuration before storing statements.");
+
+ new EventUpdater(eventStore).update(subject, old -> {
+ final Event.Builder updated;
+ if(!old.isPresent()) {
+ updated = Event.builder()
+ .setSubject(subject);
+ } else {
+ updated = Event.builder(old.get());
+ }
+
+ final URI pred = statement.getObject().getDataType();
+ if(pred.equals(GeoConstants.GEO_AS_WKT) || pred.equals(GeoConstants.GEO_AS_GML) ||
+ pred.equals(GeoConstants.XMLSCHEMA_OGC_WKT) || pred.equals(GeoConstants.XMLSCHEMA_OGC_GML)) {
+ //is geo
+ try {
+ final Statement geoStatement = RyaToRdfConversions.convertStatement(statement);
+ final Geometry geometry = GeoParseUtils.getGeometry(geoStatement);
+ updated.setGeometry(geometry);
+ } catch (final ParseException e) {
+ LOG.error(e.getMessage(), e);
+ }
+ } else {
+ //is time
+ final String dateTime = statement.getObject().getData();
+ final Matcher matcher = TemporalInstantRfc3339.PATTERN.matcher(dateTime);
+ if (matcher.find()) {
+ final TemporalInterval interval = TemporalInstantRfc3339.parseInterval(dateTime);
+ updated.setTemporalInterval(interval);
+ } else {
+ final TemporalInstant instant = new TemporalInstantRfc3339(DateTime.parse(dateTime));
+ updated.setTemporalInstant(instant);
+ }
+ }
+ return Optional.of(updated.build());
+ });
+ }
+
+ @Override
+ public String getCollectionName() {
+ return ConfigUtils.getTablePrefix(conf) + GEO_TEMPORAL_COLLECTION;
+ }
+
+ @Override
+ public EventStorage getEventStorage(final Configuration conf) {
+ if(events.get() != null) {
+ return events.get();
+ }
+
+ final MongoDBRdfConfiguration mongoConf = (MongoDBRdfConfiguration) conf;
+ mongoClient = mongoConf.getMongoClient();
+ if (mongoClient == null) {
+ mongoClient = MongoConnectorFactory.getMongoClient(conf);
+ }
+ final String ryaInstanceName = mongoConf.getMongoDBName();
+ events.set(new MongoEventStorage(mongoClient, ryaInstanceName));
+ return events.get();
+ }
+}