You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rya.apache.org by mi...@apache.org on 2017/06/14 18:50:14 UTC

[2/5] incubator-rya git commit: RYA-237 Mongo GeoTemporal Indexer

RYA-237 Mongo GeoTemporal Indexer

Mongo support for the GeoTemporalIndexer


Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/63095d45
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/63095d45
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/63095d45

Branch: refs/heads/master
Commit: 63095d45a357da622343b42ed5ce22fd8217cc80
Parents: af5964a
Author: isper3at <sm...@gmail.com>
Authored: Wed Jan 18 15:38:11 2017 -0500
Committer: Aaron Mihalik <mi...@alum.mit.edu>
Committed: Wed Jun 14 13:27:56 2017 -0400

----------------------------------------------------------------------
 .../mongo/EventDocumentConverter.java           | 147 ++++++++++
 .../geotemporal/mongo/EventUpdater.java         |  85 ++++++
 .../GeoTemporalMongoDBStorageStrategy.java      | 293 +++++++++++++++++++
 .../geotemporal/mongo/MongoEventStorage.java    | 196 +++++++++++++
 .../mongo/MongoGeoTemporalIndexer.java          | 222 ++++++++++++++
 5 files changed, 943 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63095d45/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/mongo/EventDocumentConverter.java
----------------------------------------------------------------------
diff --git a/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/mongo/EventDocumentConverter.java b/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/mongo/EventDocumentConverter.java
new file mode 100644
index 0000000..a41428e
--- /dev/null
+++ b/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/mongo/EventDocumentConverter.java
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.geotemporal.mongo;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Date;
+import java.util.List;
+
+import org.apache.rya.api.domain.RyaURI;
+import org.apache.rya.indexing.TemporalInstant;
+import org.apache.rya.indexing.TemporalInstantRfc3339;
+import org.apache.rya.indexing.TemporalInterval;
+import org.apache.rya.indexing.entity.storage.mongo.DocumentConverter;
+import org.apache.rya.indexing.geotemporal.model.Event;
+import org.apache.rya.indexing.mongodb.geo.GeoMongoDBStorageStrategy;
+import org.bson.Document;
+import org.bson.types.BasicBSONList;
+import org.joda.time.DateTime;
+
+import com.vividsolutions.jts.geom.Coordinate;
+import com.vividsolutions.jts.geom.Geometry;
+import com.vividsolutions.jts.geom.GeometryFactory;
+
+public class EventDocumentConverter implements DocumentConverter<Event>{
+    public static final String SUBJECT = "_id";
+    public static final String GEO_KEY = "location";
+    public static final String INTERVAL_START = "start";
+    public static final String INTERVAL_END = "end";
+    public static final String INSTANT = "instant";
+
+    private final GeoMongoDBStorageStrategy geoAdapter = new GeoMongoDBStorageStrategy(0);
+
+    @Override
+    public Document toDocument(final Event event) {
+        requireNonNull(event);
+
+        final Document doc = new Document();
+        doc.append(SUBJECT, event.getSubject().getData());
+
+        if(event.getGeometry().isPresent()) {
+            final BasicBSONList points = new BasicBSONList();
+            for(final double[] point : geoAdapter.getCorrespondingPoints(event.getGeometry().get())) {
+                final BasicBSONList pointDoc = new BasicBSONList();
+                for(final double p : point) {
+                    pointDoc.add(p);
+                }
+                points.add(pointDoc);
+            }
+
+            doc.append(GEO_KEY, points);
+        }
+        if(event.isInstant()) {
+            if(event.getInstant().isPresent()) {
+                doc.append(INSTANT, event.getInstant().get().getAsDateTime().toDate());
+            }
+        } else {
+            if(event.getInterval().isPresent()) {
+                doc.append(INTERVAL_START, event.getInterval().get().getHasBeginning().getAsDateTime().toDate());
+                doc.append(INTERVAL_END, event.getInterval().get().getHasEnd().getAsDateTime().toDate());
+            }
+        }
+
+        return doc;
+    }
+
+    @Override
+    public Event fromDocument(final Document document) throws DocumentConverterException {
+        requireNonNull(document);
+
+        final boolean isInstant;
+
+        // Preconditions.
+        if(!document.containsKey(SUBJECT)) {
+            throw new DocumentConverterException("Could not convert document '" + document +
+                    "' because its '" + SUBJECT + "' field is missing.");
+        }
+
+        if(document.containsKey(INSTANT)) {
+            isInstant = true;
+        } else {
+            isInstant = false;
+        }
+
+        final String subject = document.getString(SUBJECT);
+
+        final Event.Builder builder = new Event.Builder()
+            .setSubject(new RyaURI(subject));
+
+        if(document.containsKey(GEO_KEY)) {
+            final List<List<Double>> pointsList = (List<List<Double>>) document.get(GEO_KEY);
+            final Coordinate[] coords = new Coordinate[pointsList.size()];
+
+            int ii = 0;
+            for(final List<Double> point : pointsList) {
+                coords[ii] = new Coordinate(point.get(0), point.get(1));
+                ii++;
+            }
+
+            final GeometryFactory geoFact = new GeometryFactory();
+            final Geometry geo;
+            if(coords.length == 1) {
+                geo = geoFact.createPoint(coords[0]);
+            } else {
+                geo = geoFact.createPolygon(coords);
+            }
+            builder.setGeometry(geo);
+        }
+
+        if(isInstant) {
+            //we already know the key exists
+            final Date date = (Date) document.get(INSTANT);
+            final DateTime dt = new DateTime(date.getTime());
+            final TemporalInstant instant = new TemporalInstantRfc3339(dt);
+            builder.setTemporalInstant(instant);
+        } else if(document.containsKey(INTERVAL_START)){
+            Date date = (Date) document.get(INTERVAL_START);
+            DateTime dt = new DateTime(date.getTime());
+            final TemporalInstant begining = new TemporalInstantRfc3339(dt);
+
+            date = (Date) document.get(INTERVAL_END);
+            dt = new DateTime(date.getTime());
+            final TemporalInstant end = new TemporalInstantRfc3339(dt);
+
+            final TemporalInterval interval = new TemporalInterval(begining, end);
+            builder.setTemporalInterval(interval);
+        }
+        return builder.build();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63095d45/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/mongo/EventUpdater.java
----------------------------------------------------------------------
diff --git a/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/mongo/EventUpdater.java b/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/mongo/EventUpdater.java
new file mode 100644
index 0000000..c9f4658
--- /dev/null
+++ b/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/mongo/EventUpdater.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.geotemporal.mongo;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Optional;
+
+import org.apache.rya.api.domain.RyaURI;
+import org.apache.rya.indexing.geotemporal.model.Event;
+import org.apache.rya.indexing.geotemporal.storage.EventStorage;
+import org.apache.rya.indexing.geotemporal.storage.EventStorage.EventStorageException;
+import org.apache.rya.indexing.mongodb.update.DocumentUpdater;
+import org.apache.rya.indexing.mongodb.update.RyaObjectStorage.ObjectStorageException;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * Performs update operations over an {@link EventStorage}.
+ */
+@DefaultAnnotation(NonNull.class)
+public class EventUpdater implements DocumentUpdater<RyaURI, Event>{
+    private final EventStorage events;
+
+    /**
+     * Constructs an instance of {@link EventUpdater}
+     *
+     * @param events - The storage this updater operates over. (not null)
+     */
+    public EventUpdater(final EventStorage events) {
+        this.events = requireNonNull(events);
+    }
+
+    @Override
+    public Optional<Event> getOld(final RyaURI key) throws EventStorageException {
+        try {
+            return events.get(key);
+        } catch (final ObjectStorageException e) {
+            throw new EventStorageException(e.getMessage(), e);
+        }
+    }
+
+    @Override
+    public void create(final Event newObj) throws EventStorageException {
+        try {
+            events.create(newObj);
+        } catch (final ObjectStorageException e) {
+            throw new EventStorageException(e.getMessage(), e);
+        }
+    }
+
+    @Override
+    public void update(final Event old, final Event updated) throws EventStorageException {
+        try {
+            events.update(old, updated);
+        } catch (final ObjectStorageException e) {
+            throw new EventStorageException(e.getMessage(), e);
+        }
+    }
+
+    public void delete(final Event event) throws EventStorageException {
+        try {
+            events.delete(event.getSubject());
+        } catch (final ObjectStorageException e) {
+            throw new EventStorageException(e.getMessage(), e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63095d45/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/mongo/GeoTemporalMongoDBStorageStrategy.java
----------------------------------------------------------------------
diff --git a/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/mongo/GeoTemporalMongoDBStorageStrategy.java b/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/mongo/GeoTemporalMongoDBStorageStrategy.java
new file mode 100644
index 0000000..352dcb6
--- /dev/null
+++ b/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/mongo/GeoTemporalMongoDBStorageStrategy.java
@@ -0,0 +1,293 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.geotemporal.mongo;
+
+import static org.apache.rya.indexing.mongodb.geo.GeoMongoDBStorageStrategy.GeoQueryType.EQUALS;
+import static org.apache.rya.indexing.mongodb.geo.GeoMongoDBStorageStrategy.GeoQueryType.INTERSECTS;
+import static org.apache.rya.indexing.mongodb.geo.GeoMongoDBStorageStrategy.GeoQueryType.WITHIN;
+import static org.apache.rya.indexing.mongodb.temporal.TemporalMongoDBStorageStrategy.INSTANT;
+import static org.apache.rya.indexing.mongodb.temporal.TemporalMongoDBStorageStrategy.INTERVAL_END;
+import static org.apache.rya.indexing.mongodb.temporal.TemporalMongoDBStorageStrategy.INTERVAL_START;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.regex.Matcher;
+
+import org.apache.log4j.Logger;
+import org.apache.rya.api.domain.RyaStatement;
+import org.apache.rya.api.resolver.RyaToRdfConversions;
+import org.apache.rya.indexing.GeoConstants;
+import org.apache.rya.indexing.IndexingExpr;
+import org.apache.rya.indexing.TemporalInstant;
+import org.apache.rya.indexing.TemporalInstantRfc3339;
+import org.apache.rya.indexing.TemporalInterval;
+import org.apache.rya.indexing.accumulo.geo.GeoParseUtils;
+import org.apache.rya.indexing.geotemporal.GeoTemporalIndexException;
+import org.apache.rya.indexing.geotemporal.GeoTemporalIndexer.GeoPolicy;
+import org.apache.rya.indexing.geotemporal.GeoTemporalIndexer.TemporalPolicy;
+import org.apache.rya.indexing.mongodb.IndexingMongoDBStorageStrategy;
+import org.apache.rya.indexing.mongodb.geo.GeoMongoDBStorageStrategy;
+import org.apache.rya.indexing.mongodb.geo.GeoMongoDBStorageStrategy.GeoQuery;
+import org.apache.rya.indexing.mongodb.temporal.TemporalMongoDBStorageStrategy;
+import org.joda.time.DateTime;
+import org.openrdf.model.Statement;
+import org.openrdf.model.URI;
+import org.openrdf.query.MalformedQueryException;
+
+import com.mongodb.BasicDBObject;
+import com.mongodb.BasicDBObjectBuilder;
+import com.mongodb.DBCollection;
+import com.mongodb.DBObject;
+import com.mongodb.QueryBuilder;
+import com.vividsolutions.jts.geom.Geometry;
+import com.vividsolutions.jts.io.ParseException;
+import com.vividsolutions.jts.io.WKTReader;
+
+import jline.internal.Log;
+
+/**
+ * TODO: doc
+ */
+public class GeoTemporalMongoDBStorageStrategy extends IndexingMongoDBStorageStrategy {
+    private static final Logger LOG = Logger.getLogger(GeoTemporalMongoDBStorageStrategy.class);
+    private static final String GEO_KEY = "location";
+    private static final String TIME_KEY = "time";
+    private final TemporalMongoDBStorageStrategy temporalStrategy;
+    private final GeoMongoDBStorageStrategy geoStrategy;
+
+    public GeoTemporalMongoDBStorageStrategy() {
+        geoStrategy = new GeoMongoDBStorageStrategy(0);
+        temporalStrategy = new TemporalMongoDBStorageStrategy();
+    }
+
+    @Override
+    public void createIndices(final DBCollection coll){
+        coll.createIndex(GEO_KEY);
+        coll.createIndex(TIME_KEY);
+    }
+
+    public DBObject getFilterQuery(final Collection<IndexingExpr> geoFilters, final Collection<IndexingExpr> temporalFilters) throws GeoTemporalIndexException {
+        final QueryBuilder builder = QueryBuilder.start();
+
+        if(!geoFilters.isEmpty()) {
+            final DBObject[] geo = getGeoObjs(geoFilters);
+            if(!temporalFilters.isEmpty()) {
+                final DBObject[] temporal = getTemporalObjs(temporalFilters);
+                builder.and(oneOrAnd(geo), oneOrAnd(temporal));
+                return builder.get();
+            } else {
+                return oneOrAnd(geo);
+            }
+        } else if(!temporalFilters.isEmpty()) {
+            final DBObject[] temporal = getTemporalObjs(temporalFilters);
+            return oneOrAnd(temporal);
+        } else {
+            return builder.get();
+        }
+    }
+
+    private DBObject oneOrAnd(final DBObject[] dbos) {
+        if(dbos.length == 1) {
+            return dbos[0];
+        }
+        return QueryBuilder.start()
+            .and(dbos)
+            .get();
+    }
+
+    @Override
+    public DBObject serialize(final RyaStatement ryaStatement) {
+        final BasicDBObjectBuilder builder = BasicDBObjectBuilder.start("_id", ryaStatement.getSubject().hashCode());
+        final URI obj = ryaStatement.getObject().getDataType();
+
+
+        if(obj.equals(GeoConstants.GEO_AS_WKT) || obj.equals(GeoConstants.GEO_AS_GML) ||
+           obj.equals(GeoConstants.XMLSCHEMA_OGC_GML) || obj.equals(GeoConstants.XMLSCHEMA_OGC_WKT)) {
+            try {
+                final Statement statement = RyaToRdfConversions.convertStatement(ryaStatement);
+                final Geometry geo = GeoParseUtils.getGeometry(statement);
+                builder.add(GEO_KEY, geoStrategy.getCorrespondingPoints(geo));
+            } catch (final ParseException e) {
+                LOG.error("Could not create geometry for statement " + ryaStatement, e);
+                return null;
+            }
+        } else {
+            builder.add(TIME_KEY, temporalStrategy.getTimeValue(ryaStatement.getObject().getData()));
+        }
+        return builder.get();
+    }
+
+    private DBObject[] getGeoObjs(final Collection<IndexingExpr> geoFilters) {
+        final List<DBObject> objs = new ArrayList<>();
+        geoFilters.forEach(filter -> {
+            final GeoPolicy policy = GeoPolicy.fromURI(filter.getFunction());
+            final WKTReader reader = new WKTReader();
+            final String geoStr = filter.getArguments()[0].stringValue();
+            try {
+                //This method is what is used in the GeoIndexer.
+                final Geometry geo = reader.read(geoStr);
+                objs.add(getGeoObject(geo, policy));
+            } catch (final GeoTemporalIndexException | UnsupportedOperationException | ParseException e) {
+                Log.error("Unable to parse '" + geoStr + "'.", e);
+            }
+        });
+        return objs.toArray(new DBObject[]{});
+    }
+
+    private DBObject[] getTemporalObjs(final Collection<IndexingExpr> temporalFilters) {
+        final List<DBObject> objs = new ArrayList<>();
+        temporalFilters.forEach(filter -> {
+            final TemporalPolicy policy = TemporalPolicy.fromURI(filter.getFunction());
+            final String timeStr = filter.getArguments()[0].stringValue();
+            final Matcher matcher = TemporalInstantRfc3339.PATTERN.matcher(timeStr);
+            if(matcher.find()) {
+                final TemporalInterval interval = TemporalInstantRfc3339.parseInterval(timeStr);
+                if(policy == TemporalPolicy.INSTANT_AFTER_INSTANT  ||
+                   policy == TemporalPolicy.INSTANT_BEFORE_INSTANT ||
+                   policy == TemporalPolicy.INSTANT_EQUALS_INSTANT) {
+                     if(interval == null) {
+                         Log.error("Cannot perform temporal interval based queries on an instant.");
+                     }
+                 }
+                objs.add(getTemporalObject(interval, policy));
+            } else {
+                final TemporalInstant instant = new TemporalInstantRfc3339(DateTime.parse(timeStr));
+                if(policy != TemporalPolicy.INSTANT_AFTER_INSTANT  &&
+                   policy != TemporalPolicy.INSTANT_BEFORE_INSTANT &&
+                   policy != TemporalPolicy.INSTANT_EQUALS_INSTANT) {
+                    Log.error("Cannot perform temporal instant based queries on an interval.");
+                }
+                objs.add(getTemporalObject(instant, policy));
+            }
+        });
+        return objs.toArray(new DBObject[]{});
+    }
+
+    private DBObject getGeoObject (final Geometry geo, final GeoPolicy policy) throws GeoTemporalIndexException {
+        switch(policy) {
+            case CONTAINS:
+                throw new UnsupportedOperationException("Contains queries are not supported in Mongo DB.");
+            case CROSSES:
+                throw new UnsupportedOperationException("Crosses queries are not supported in Mongo DB.");
+            case DISJOINT:
+                throw new UnsupportedOperationException("Disjoint queries are not supported in Mongo DB.");
+            case EQUALS:
+                try {
+                    return geoStrategy.getQuery(new GeoQuery(EQUALS, geo));
+                } catch (final MalformedQueryException e) {
+                    throw new GeoTemporalIndexException(e.getMessage(), e);
+                }
+            case INTERSECTS:
+                try {
+                    return geoStrategy.getQuery(new GeoQuery(INTERSECTS, geo));
+                } catch (final MalformedQueryException e) {
+                    throw new GeoTemporalIndexException(e.getMessage(), e);
+                }
+            case OVERLAPS:
+                throw new UnsupportedOperationException("Overlaps queries are not supported in Mongo DB.");
+            case TOUCHES:
+                throw new UnsupportedOperationException("Touches queries are not supported in Mongo DB.");
+            case WITHIN:
+                try {
+                    return geoStrategy.getQuery(new GeoQuery(WITHIN, geo));
+                } catch (final MalformedQueryException e) {
+                    throw new GeoTemporalIndexException(e.getMessage(), e);
+                }
+            default:
+                return new BasicDBObject();
+        }
+    }
+
+    private DBObject getTemporalObject(final TemporalInstant instant, final TemporalPolicy policy) {
+        final DBObject temporalObj;
+        switch(policy) {
+            case INSTANT_AFTER_INSTANT:
+                temporalObj = QueryBuilder.start(INSTANT)
+                       .greaterThan(instant.getAsDateTime().toDate())
+                       .get();
+                break;
+            case INSTANT_BEFORE_INSTANT:
+                temporalObj = QueryBuilder.start(INSTANT)
+                       .lessThan(instant.getAsDateTime().toDate())
+                       .get();
+                break;
+            case INSTANT_EQUALS_INSTANT:
+                temporalObj = QueryBuilder.start(INSTANT)
+                       .is(instant.getAsDateTime().toDate())
+                       .get();
+                break;
+             default:
+                 temporalObj = new BasicDBObject();
+        }
+        return temporalObj;
+    }
+
+    private DBObject getTemporalObject(final TemporalInterval interval, final TemporalPolicy policy) {
+        final DBObject temporalObj;
+        switch(policy) {
+            case INSTANT_AFTER_INTERVAL:
+                temporalObj = QueryBuilder.start(INSTANT)
+                       .greaterThan(interval.getHasEnd().getAsDateTime().toDate())
+                       .get();
+                break;
+            case INSTANT_BEFORE_INTERVAL:
+                temporalObj = QueryBuilder.start(INSTANT)
+                       .lessThan(interval.getHasBeginning().getAsDateTime().toDate())
+                       .get();
+                break;
+            case INSTANT_END_INTERVAL:
+                temporalObj = QueryBuilder.start(INSTANT)
+                       .is(interval.getHasEnd().getAsDateTime().toDate())
+                       .get();
+                break;
+            case INSTANT_IN_INTERVAL:
+                temporalObj = QueryBuilder.start(INSTANT)
+                       .greaterThan(interval.getHasBeginning().getAsDateTime().toDate())
+                       .lessThan(interval.getHasEnd().getAsDateTime().toDate())
+                       .get();
+                break;
+            case INSTANT_START_INTERVAL:
+                temporalObj = QueryBuilder.start(INSTANT)
+                       .is(interval.getHasBeginning().getAsDateTime().toDate())
+                       .get();
+                break;
+            case INTERVAL_AFTER:
+                temporalObj = QueryBuilder.start(INTERVAL_START)
+                       .greaterThan(interval.getHasEnd().getAsDateTime().toDate())
+                       .get();
+                break;
+            case INTERVAL_BEFORE:
+                temporalObj = QueryBuilder.start(INTERVAL_END)
+                       .lessThan(interval.getHasBeginning().getAsDateTime().toDate())
+                       .get();
+                break;
+            case INTERVAL_EQUALS:
+                temporalObj = QueryBuilder.start(INTERVAL_START)
+                       .is(interval.getHasBeginning().getAsDateTime().toDate())
+                       .and(INTERVAL_END)
+                       .is(interval.getHasEnd().getAsDateTime().toDate())
+                       .get();
+                break;
+             default:
+                 temporalObj = new BasicDBObject();
+        }
+        return temporalObj;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63095d45/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/mongo/MongoEventStorage.java
----------------------------------------------------------------------
diff --git a/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/mongo/MongoEventStorage.java b/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/mongo/MongoEventStorage.java
new file mode 100644
index 0000000..8ddf075
--- /dev/null
+++ b/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/mongo/MongoEventStorage.java
@@ -0,0 +1,196 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.geotemporal.mongo;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+
+import org.apache.rya.api.domain.RyaURI;
+import org.apache.rya.indexing.IndexingExpr;
+import org.apache.rya.indexing.entity.model.TypedEntity;
+import org.apache.rya.indexing.entity.storage.mongo.DocumentConverter.DocumentConverterException;
+import org.apache.rya.indexing.geotemporal.GeoTemporalIndexException;
+import org.apache.rya.indexing.geotemporal.model.Event;
+import org.apache.rya.indexing.geotemporal.storage.EventStorage;
+import org.apache.rya.indexing.entity.storage.mongo.MongoEntityStorage;
+import org.bson.BsonDocument;
+import org.bson.BsonString;
+import org.bson.Document;
+import org.bson.conversions.Bson;
+
+import com.mongodb.BasicDBObjectBuilder;
+import com.mongodb.DBObject;
+import com.mongodb.ErrorCategory;
+import com.mongodb.MongoClient;
+import com.mongodb.MongoException;
+import com.mongodb.client.MongoCollection;
+import com.mongodb.client.MongoCursor;
+import com.mongodb.client.model.Filters;
+
+public class MongoEventStorage implements EventStorage {
+
+    protected static final String COLLECTION_NAME = "geotemporal-events";
+
+    private static final EventDocumentConverter EVENT_CONVERTER = new EventDocumentConverter();
+
+    /**
+     * A client connected to the Mongo instance that hosts the Rya instance.
+     */
+    protected final MongoClient mongo;
+
+    /**
+     * The name of the Rya instance the {@link TypedEntity}s are for.
+     */
+    protected final String ryaInstanceName;
+
+    /*
+     * Used to get the filter query objects.
+     */
+    private final GeoTemporalMongoDBStorageStrategy queryAdapter;
+
+    /**
+     * Constructs an instance of {@link MongoEntityStorage}.
+     *
+     * @param mongo - A client connected to the Mongo instance that hosts the Rya instance. (not null)
+     * @param ryaInstanceName - The name of the Rya instance the {@link TypedEntity}s are for. (not null)
+     */
+    public MongoEventStorage(final MongoClient mongo, final String ryaInstanceName) {
+        this.mongo = requireNonNull(mongo);
+        this.ryaInstanceName = requireNonNull(ryaInstanceName);
+        queryAdapter = new GeoTemporalMongoDBStorageStrategy();
+    }
+
+    @Override
+    public void create(final Event event) throws EventStorageException {
+        requireNonNull(event);
+
+        try {
+            mongo.getDatabase(ryaInstanceName)
+                .getCollection(COLLECTION_NAME)
+                .insertOne(EVENT_CONVERTER.toDocument(event));
+        } catch(final MongoException e) {
+            final ErrorCategory category = ErrorCategory.fromErrorCode( e.getCode() );
+            if(category == ErrorCategory.DUPLICATE_KEY) {
+                throw new EventAlreadyExistsException("Failed to create Event with Subject '" + event.getSubject().getData() + "'.", e);
+            }
+            throw new EventStorageException("Failed to create Event with Subject '" + event.getSubject().getData() + "'.", e);
+        }
+    }
+
+    @Override
+    public Optional<Event> get(final RyaURI subject) throws EventStorageException {
+        requireNonNull(subject);
+
+        try {
+            final Document document = mongo.getDatabase(ryaInstanceName)
+                .getCollection(COLLECTION_NAME)
+                .find( new BsonDocument(EventDocumentConverter.SUBJECT, new BsonString(subject.getData())) )
+                .first();
+
+            return document == null ?
+                    Optional.empty() :
+                    Optional.of( EVENT_CONVERTER.fromDocument(document) );
+
+        } catch(final MongoException | DocumentConverterException e) {
+            throw new EventStorageException("Could not get the Event with Subject '" + subject.getData() + "'.", e);
+        }
+    }
+
+    @Override
+    public Collection<Event> search(final Optional<RyaURI> subject, final Optional<Collection<IndexingExpr>> geoFilters, final Optional<Collection<IndexingExpr>> temporalFilters) throws EventStorageException {
+        requireNonNull(subject);
+
+        try {
+            final Collection<IndexingExpr> geos = (geoFilters.isPresent() ? geoFilters.get() : new ArrayList<>());
+            final Collection<IndexingExpr> tempos = (temporalFilters.isPresent() ? temporalFilters.get() : new ArrayList<>());
+            final DBObject filterObj = queryAdapter.getFilterQuery(geos, tempos);
+
+            final BasicDBObjectBuilder builder = BasicDBObjectBuilder
+            .start(filterObj.toMap());
+            if(subject.isPresent()) {
+                builder.append(EventDocumentConverter.SUBJECT, subject.get().getData());
+            }
+            final MongoCursor<Document> results = mongo.getDatabase(ryaInstanceName)
+                .getCollection(COLLECTION_NAME)
+                .find( BsonDocument.parse(builder.get().toString()) )
+                .iterator();
+
+            final List<Event> events = new ArrayList<>();
+            final EventDocumentConverter adapter = new EventDocumentConverter();
+            while(results.hasNext()) {
+                events.add(adapter.fromDocument(results.next()));
+            }
+            return events;
+        } catch(final MongoException | DocumentConverterException | GeoTemporalIndexException e) {
+            throw new EventStorageException("Could not get the Event.", e);
+        }
+    }
+
+    @Override
+    public void update(final Event old, final Event updated) throws StaleUpdateException, EventStorageException {
+        requireNonNull(old);
+        requireNonNull(updated);
+
+        // The updated entity must have the same Subject as the one it is replacing.
+        if(!old.getSubject().equals(updated.getSubject())) {
+            throw new EventStorageException("The old Event and the updated Event must have the same Subject. " +
+                    "Old Subject: " + old.getSubject().getData() + ", Updated Subject: " + updated.getSubject().getData());
+        }
+
+        final Set<Bson> filters = new HashSet<>();
+
+        // Must match the old entity's Subject.
+        filters.add( makeSubjectFilter(old.getSubject()) );
+
+        // Do a find and replace.
+        final Bson oldEntityFilter = Filters.and(filters);
+        final Document updatedDoc = EVENT_CONVERTER.toDocument(updated);
+
+        final MongoCollection<Document> collection = mongo.getDatabase(ryaInstanceName).getCollection(COLLECTION_NAME);
+        if(collection.findOneAndReplace(oldEntityFilter, updatedDoc) == null) {
+            throw new StaleUpdateException("Could not update the Event with Subject '" + updated.getSubject().getData() + ".");
+        }
+    }
+
+    @Override
+    public boolean delete(final RyaURI subject) throws EventStorageException {
+        requireNonNull(subject);
+
+        try {
+            final Document deleted = mongo.getDatabase(ryaInstanceName)
+                .getCollection(COLLECTION_NAME)
+                .findOneAndDelete( makeSubjectFilter(subject) );
+
+            return deleted != null;
+
+        } catch(final MongoException e) {
+            throw new EventStorageException("Could not delete the Event with Subject '" + subject.getData() + "'.", e);
+        }
+    }
+
+    private static Bson makeSubjectFilter(final RyaURI subject) {
+        return Filters.eq(EventDocumentConverter.SUBJECT, subject.getData());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63095d45/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/mongo/MongoGeoTemporalIndexer.java
----------------------------------------------------------------------
diff --git a/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/mongo/MongoGeoTemporalIndexer.java b/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/mongo/MongoGeoTemporalIndexer.java
new file mode 100644
index 0000000..1baab18
--- /dev/null
+++ b/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/mongo/MongoGeoTemporalIndexer.java
@@ -0,0 +1,222 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.geotemporal.mongo;
+
+import static com.google.common.base.Preconditions.checkState;
+import static java.util.Objects.requireNonNull;
+
+import java.io.IOException;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.regex.Matcher;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.log4j.Logger;
+import org.apache.rya.api.domain.RyaStatement;
+import org.apache.rya.api.domain.RyaURI;
+import org.apache.rya.api.resolver.RyaToRdfConversions;
+import org.apache.rya.indexing.GeoConstants;
+import org.apache.rya.indexing.TemporalInstant;
+import org.apache.rya.indexing.TemporalInstantRfc3339;
+import org.apache.rya.indexing.TemporalInterval;
+import org.apache.rya.indexing.accumulo.ConfigUtils;
+import org.apache.rya.indexing.accumulo.geo.GeoParseUtils;
+import org.apache.rya.indexing.geotemporal.GeoTemporalIndexer;
+import org.apache.rya.indexing.geotemporal.model.Event;
+import org.apache.rya.indexing.geotemporal.storage.EventStorage;
+import org.apache.rya.indexing.mongodb.AbstractMongoIndexer;
+import org.apache.rya.indexing.mongodb.IndexingException;
+import org.apache.rya.mongodb.MongoConnectorFactory;
+import org.apache.rya.mongodb.MongoDBRdfConfiguration;
+import org.joda.time.DateTime;
+import org.openrdf.model.Statement;
+import org.openrdf.model.URI;
+
+import com.vividsolutions.jts.geom.Geometry;
+import com.vividsolutions.jts.io.ParseException;
+
+/**
+ * Indexer that stores 2 separate statements as one 'Event' entity.
+ * <p>
+ * The statements are required to have the same subject, one must be
+ * a Geo based statement and the other a temporal based statement.
+ * <p>
+ * This indexer is later used when querying for geo/temporal statements
+ * in the format of:
+ * <pre>
+ * {@code
+ * QUERY PARAMS
+ *   ?SomeSubject geo:predicate ?Location
+ *   ?SomeSubject time:predicate ?Time
+ *   Filter(?Location, geoFunction())
+ *   Filter(?Time, temporalFunction())
+ * }
+ *
+ * The number of filters is not strict, but there must be at least one
+ * query pattern for geo and one for temporal as well as at least one
+ * filter for each type.
+ */
+public class MongoGeoTemporalIndexer extends AbstractMongoIndexer<GeoTemporalMongoDBStorageStrategy> implements GeoTemporalIndexer {
+    private static final Logger LOG = Logger.getLogger(MongoGeoTemporalIndexer.class);
+    public static final String GEO_TEMPORAL_COLLECTION = "geo_temporal";
+
+    private final AtomicReference<MongoDBRdfConfiguration> configuration = new AtomicReference<>();
+    private final AtomicReference<EventStorage> events = new AtomicReference<>();
+
+    @Override
+    public void init() {
+        initCore();
+        predicates = ConfigUtils.getGeoPredicates(conf);
+        predicates.addAll(ConfigUtils.getTemporalPredicates(conf));
+        storageStrategy = new GeoTemporalMongoDBStorageStrategy();
+    }
+
+    @Override
+    public void setConf(final Configuration conf) {
+        requireNonNull(conf);
+        events.set(null);
+        events.set(getEventStorage(conf));
+        super.conf = conf;
+        configuration.set(new MongoDBRdfConfiguration(conf));
+    }
+
+    @Override
+    public void storeStatement(final RyaStatement ryaStatement) throws IOException {
+        requireNonNull(ryaStatement);
+
+        try {
+            updateEvent(ryaStatement.getSubject(), ryaStatement);
+        } catch (IndexingException | ParseException e) {
+            throw new IOException("Failed to update the Entity index.", e);
+        }
+    }
+
+    @Override
+    public void deleteStatement(final RyaStatement statement) throws IOException {
+        requireNonNull(statement);
+        final RyaURI subject = statement.getSubject();
+        try {
+            final EventStorage eventStore = events.get();
+            checkState(events != null, "Must set this indexers configuration before storing statements.");
+
+            new EventUpdater(eventStore).update(subject, old -> {
+                final Event.Builder updated;
+                if(!old.isPresent()) {
+                    return Optional.empty();
+                } else {
+                    updated = Event.builder(old.get());
+                }
+
+                final Event currentEvent = updated.build();
+                final URI pred = statement.getObject().getDataType();
+                if((pred.equals(GeoConstants.GEO_AS_WKT) || pred.equals(GeoConstants.GEO_AS_GML) ||
+                   pred.equals(GeoConstants.XMLSCHEMA_OGC_WKT) || pred.equals(GeoConstants.XMLSCHEMA_OGC_GML))
+                   && currentEvent.getGeometry().isPresent()) {
+                    //is geo and needs to be removed.
+                    try {
+                        if(currentEvent.getGeometry().get().equals(GeoParseUtils.getGeometry(RyaToRdfConversions.convertStatement(statement)))) {
+                            updated.setGeometry(null);
+                        }
+                    } catch (final Exception e) {
+                        LOG.debug("Unable to parse the stored geometry.");
+                    }
+                } else {
+                    //is time
+                    final String dateTime = statement.getObject().getData();
+                    final Matcher matcher = TemporalInstantRfc3339.PATTERN.matcher(dateTime);
+                    if (matcher.find()) {
+                        final TemporalInterval interval = TemporalInstantRfc3339.parseInterval(dateTime);
+                        if(currentEvent.getInterval().get().equals(interval)) {
+                            updated.setTemporalInterval(null);
+                        }
+                    } else {
+                        final TemporalInstant instant = new TemporalInstantRfc3339(DateTime.parse(dateTime));
+                        if(currentEvent.getInstant().get().equals(instant)) {
+                            updated.setTemporalInstant(null);
+                        }
+                    }
+                }
+                return Optional.of(updated.build());
+            });
+        } catch (final IndexingException e) {
+            throw new IOException("Failed to update the Entity index.", e);
+        }
+    }
+
+    private void updateEvent(final RyaURI subject, final RyaStatement statement) throws IndexingException, ParseException {
+        final EventStorage eventStore = events.get();
+        checkState(events != null, "Must set this indexers configuration before storing statements.");
+
+        new EventUpdater(eventStore).update(subject, old -> {
+            final Event.Builder updated;
+            if(!old.isPresent()) {
+                updated = Event.builder()
+                    .setSubject(subject);
+            } else {
+                updated = Event.builder(old.get());
+            }
+
+            final URI pred = statement.getObject().getDataType();
+            if(pred.equals(GeoConstants.GEO_AS_WKT) || pred.equals(GeoConstants.GEO_AS_GML) ||
+               pred.equals(GeoConstants.XMLSCHEMA_OGC_WKT) || pred.equals(GeoConstants.XMLSCHEMA_OGC_GML)) {
+                //is geo
+                try {
+                    final Statement geoStatement = RyaToRdfConversions.convertStatement(statement);
+                    final Geometry geometry = GeoParseUtils.getGeometry(geoStatement);
+                    updated.setGeometry(geometry);
+                } catch (final ParseException e) {
+                    LOG.error(e.getMessage(), e);
+                }
+            } else {
+                //is time
+                final String dateTime = statement.getObject().getData();
+                final Matcher matcher = TemporalInstantRfc3339.PATTERN.matcher(dateTime);
+                if (matcher.find()) {
+                    final TemporalInterval interval = TemporalInstantRfc3339.parseInterval(dateTime);
+                    updated.setTemporalInterval(interval);
+                } else {
+                    final TemporalInstant instant = new TemporalInstantRfc3339(DateTime.parse(dateTime));
+                    updated.setTemporalInstant(instant);
+                }
+            }
+            return Optional.of(updated.build());
+        });
+    }
+
+    @Override
+    public String getCollectionName() {
+        return ConfigUtils.getTablePrefix(conf)  + GEO_TEMPORAL_COLLECTION;
+    }
+
+    @Override
+    public EventStorage getEventStorage(final Configuration conf) {
+        if(events.get() != null) {
+            return events.get();
+        }
+
+        final MongoDBRdfConfiguration mongoConf = (MongoDBRdfConfiguration) conf;
+        mongoClient = mongoConf.getMongoClient();
+        if (mongoClient == null) {
+            mongoClient = MongoConnectorFactory.getMongoClient(conf);
+        }
+        final String ryaInstanceName = mongoConf.getMongoDBName();
+        events.set(new MongoEventStorage(mongoClient, ryaInstanceName));
+        return events.get();
+    }
+}