You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rya.apache.org by dl...@apache.org on 2017/08/30 20:31:51 UTC

[13/14] incubator-rya git commit: RYA-324, RYA-272 Geo refactoring and examples closes #182

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/9e76b8d7/extras/rya.geoindexing/geo.common/src/main/java/org/apache/rya/indexing/geotemporal/GeoTemporalExternalSetMatcherFactory.java
----------------------------------------------------------------------
diff --git a/extras/rya.geoindexing/geo.common/src/main/java/org/apache/rya/indexing/geotemporal/GeoTemporalExternalSetMatcherFactory.java b/extras/rya.geoindexing/geo.common/src/main/java/org/apache/rya/indexing/geotemporal/GeoTemporalExternalSetMatcherFactory.java
new file mode 100644
index 0000000..c4a287e
--- /dev/null
+++ b/extras/rya.geoindexing/geo.common/src/main/java/org/apache/rya/indexing/geotemporal/GeoTemporalExternalSetMatcherFactory.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.geotemporal;
+
+import org.apache.rya.indexing.external.matching.AbstractExternalSetMatcherFactory;
+import org.apache.rya.indexing.external.matching.ExternalSetMatcher;
+import org.apache.rya.indexing.external.matching.JoinSegment;
+import org.apache.rya.indexing.external.matching.JoinSegmentMatcher;
+import org.apache.rya.indexing.external.matching.OptionalJoinSegment;
+import org.apache.rya.indexing.external.matching.OptionalJoinSegmentMatcher;
+import org.apache.rya.indexing.geotemporal.model.EventQueryNode;
+
+/**
+ * Factory used to build {@link EntityQueryNodeMatcher}s for the {@link GeoTemporalIndexOptimizer}.
+ *
+ */
+public class GeoTemporalExternalSetMatcherFactory extends AbstractExternalSetMatcherFactory<EventQueryNode> {
+
+    @Override
+    protected ExternalSetMatcher<EventQueryNode> getJoinSegmentMatcher(final JoinSegment<EventQueryNode> segment) {
+        return new JoinSegmentMatcher<EventQueryNode>(segment, new GeoTemporalToSegmentConverter());
+    }
+
+    @Override
+    protected ExternalSetMatcher<EventQueryNode> getOptionalJoinSegmentMatcher(final OptionalJoinSegment<EventQueryNode> segment) {
+        return new OptionalJoinSegmentMatcher<EventQueryNode>(segment, new GeoTemporalToSegmentConverter());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/9e76b8d7/extras/rya.geoindexing/geo.common/src/main/java/org/apache/rya/indexing/geotemporal/GeoTemporalIndexException.java
----------------------------------------------------------------------
diff --git a/extras/rya.geoindexing/geo.common/src/main/java/org/apache/rya/indexing/geotemporal/GeoTemporalIndexException.java b/extras/rya.geoindexing/geo.common/src/main/java/org/apache/rya/indexing/geotemporal/GeoTemporalIndexException.java
new file mode 100644
index 0000000..b2d4de5
--- /dev/null
+++ b/extras/rya.geoindexing/geo.common/src/main/java/org/apache/rya/indexing/geotemporal/GeoTemporalIndexException.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.geotemporal;
+
+import org.apache.rya.indexing.entity.model.TypedEntity;
+
+/**
+ * An operation over the {@link TypedEntity} index failed to complete.
+ */
+public class GeoTemporalIndexException extends Exception {
+    private static final long serialVersionUID = 1L;
+
+    /**
+     * Constructs a new exception with the specified detail message.  The
+     * cause is not initialized, and may subsequently be initialized by
+     * a call to {@link #initCause}.
+     *
+     * @param   message   the detail message. The detail message is saved for
+     *          later retrieval by the {@link #getMessage()} method.
+     */
+    public GeoTemporalIndexException(final String message) {
+        super(message);
+    }
+
+    /**
+     * Constructs a new exception with the specified detail message and
+     * cause.  <p>Note that the detail message associated with
+     * {@code cause} is <i>not</i> automatically incorporated in
+     * this exception's detail message.
+     *
+     * @param  message the detail message (which is saved for later retrieval
+     *         by the {@link #getMessage()} method).
+     * @param  cause the cause (which is saved for later retrieval by the
+     *         {@link #getCause()} method).  (A <tt>null</tt> value is
+     *         permitted, and indicates that the cause is nonexistent or
+     *         unknown.)
+     */
+    public GeoTemporalIndexException(final String message, final Throwable cause) {
+        super(message, cause);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/9e76b8d7/extras/rya.geoindexing/geo.common/src/main/java/org/apache/rya/indexing/geotemporal/GeoTemporalIndexSetProvider.java
----------------------------------------------------------------------
diff --git a/extras/rya.geoindexing/geo.common/src/main/java/org/apache/rya/indexing/geotemporal/GeoTemporalIndexSetProvider.java b/extras/rya.geoindexing/geo.common/src/main/java/org/apache/rya/indexing/geotemporal/GeoTemporalIndexSetProvider.java
new file mode 100644
index 0000000..bf12f26
--- /dev/null
+++ b/extras/rya.geoindexing/geo.common/src/main/java/org/apache/rya/indexing/geotemporal/GeoTemporalIndexSetProvider.java
@@ -0,0 +1,239 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.geotemporal;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import org.apache.log4j.Logger;
+import org.apache.rya.indexing.IndexingExpr;
+import org.apache.rya.indexing.IndexingFunctionRegistry;
+import org.apache.rya.indexing.IndexingFunctionRegistry.FUNCTION_TYPE;
+import org.apache.rya.indexing.accumulo.geo.GeoParseUtils;
+import org.apache.rya.indexing.accumulo.geo.GeoTupleSet;
+import org.apache.rya.indexing.external.matching.ExternalSetProvider;
+import org.apache.rya.indexing.external.matching.QuerySegment;
+import org.apache.rya.indexing.geotemporal.model.EventQueryNode;
+import org.apache.rya.indexing.geotemporal.model.EventQueryNode.EventQueryNodeBuilder;
+import org.apache.rya.indexing.geotemporal.storage.EventStorage;
+import org.openrdf.model.URI;
+import org.openrdf.model.impl.URIImpl;
+import org.openrdf.query.algebra.FunctionCall;
+import org.openrdf.query.algebra.QueryModelNode;
+import org.openrdf.query.algebra.StatementPattern;
+import org.openrdf.query.algebra.Var;
+import org.openrdf.query.algebra.helpers.QueryModelVisitorBase;
+
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
+
+/**
+ * Provides {@link GeoTupleSet}s.
+ */
+public class GeoTemporalIndexSetProvider implements ExternalSetProvider<EventQueryNode> {
+    private static final Logger LOG = Logger.getLogger(GeoTemporalIndexSetProvider.class);
+
+    //organzied by object var.  Each object is a filter, or set of filters
+    private Multimap<Var, IndexingExpr> filterMap;
+
+    //organzied by subject var.  Each subject is a GeoTemporalTupleSet
+    private Multimap<Var, StatementPattern> patternMap;
+
+    //filters that have not been constrained by statement patterns into indexing expressions yet.
+    private Multimap<Var, FunctionCall> unmatchedFilters;
+    //filters that have been used, to be used by the matcher later.
+    private Multimap<Var, FunctionCall> matchedFilters;
+
+    //organzied by object var.  Used to find matches between unmatch filters and patterns
+    private Map<Var, StatementPattern> objectPatterns;
+
+
+    private static URI filterURI;
+
+    private final EventStorage eventStorage;
+
+    public GeoTemporalIndexSetProvider(final EventStorage eventStorage) {
+        this.eventStorage = requireNonNull(eventStorage);
+    }
+
+    @Override
+    public List<EventQueryNode> getExternalSets(final QuerySegment<EventQueryNode> node) {
+        filterMap = HashMultimap.create();
+        patternMap = HashMultimap.create();
+        unmatchedFilters = HashMultimap.create();
+        matchedFilters = HashMultimap.create();
+
+        objectPatterns = new HashMap<>();
+        //discover entities
+        buildMaps(node);
+        final List<EventQueryNode> nodes = createNodes();
+
+        return nodes;
+    }
+
+    private List<EventQueryNode> createNodes() {
+        final List<EventQueryNode> nodes = new ArrayList<>();
+        for(final Var subj : patternMap.keySet()) {
+            final EventQueryNode node = getGeoTemporalNode(subj);
+            if(node != null) {
+                nodes.add(node);
+            }
+        }
+        return nodes;
+    }
+
+    private EventQueryNode getGeoTemporalNode(final Var subj) {
+        final Collection<StatementPattern> patterns = patternMap.get(subj);
+        final Collection<FunctionCall> usedFilters = new ArrayList<>();
+        Optional<StatementPattern> geoPattern = Optional.empty();
+        Optional<StatementPattern> temporalPattern = Optional.empty();
+        Optional<Collection<IndexingExpr>> geoFilters = Optional.empty();
+        Optional<Collection<IndexingExpr>> temporalFilters = Optional.empty();
+
+        //should only be 2 patterns.
+        for(final StatementPattern sp : patterns) {
+            final Var obj = sp.getObjectVar();
+
+            ///filter map does not have -const-
+
+
+            if(filterMap.containsKey(obj)) {
+                final Collection<IndexingExpr> filters = filterMap.get(obj);
+                final IndexingFunctionRegistry.FUNCTION_TYPE type = ensureSameType(filters);
+                if(type != null && type == FUNCTION_TYPE.GEO) {
+                    geoPattern = Optional.of(sp);
+                    geoFilters = Optional.of(filters);
+                    usedFilters.addAll(matchedFilters.get(obj));
+                } else if(type != null && type == FUNCTION_TYPE.TEMPORAL) {
+                    temporalPattern = Optional.of(sp);
+                    temporalFilters = Optional.of(filters);
+                    usedFilters.addAll(matchedFilters.get(obj));
+                } else {
+                    return null;
+                }
+            } else {
+                return null;
+            }
+        }
+
+        if(geoFilters.isPresent() && temporalFilters.isPresent() && geoPattern.isPresent() && temporalPattern.isPresent()) {
+            return new EventQueryNodeBuilder()
+                    .setStorage(eventStorage)
+                    .setGeoPattern(geoPattern.get())
+                    .setTemporalPattern(temporalPattern.get())
+                    .setGeoFilters(geoFilters.get())
+                    .setTemporalFilters(temporalFilters.get())
+                    .setUsedFilters(usedFilters)
+                    .build();
+        } else {
+            return null;
+        }
+    }
+
+    private static FUNCTION_TYPE ensureSameType(final Collection<IndexingExpr> filters) {
+        FUNCTION_TYPE type = null;
+        for(final IndexingExpr filter : filters) {
+            if(type == null) {
+                type = IndexingFunctionRegistry.getFunctionType(filter.getFunction());
+            } else {
+                if(IndexingFunctionRegistry.getFunctionType(filter.getFunction()) != type) {
+                    return null;
+                }
+            }
+        }
+        return type;
+    }
+
+    private void buildMaps(final QuerySegment<EventQueryNode> node) {
+        final List<QueryModelNode> unused = new ArrayList<>();
+        for (final QueryModelNode pattern : node.getOrderedNodes()) {
+            if(pattern instanceof FunctionCall) {
+                discoverFilter((FunctionCall) pattern, unused);
+            }
+            if(pattern instanceof StatementPattern) {
+                discoverPatterns((StatementPattern) pattern, unused);
+            }
+        }
+    }
+
+    private void discoverFilter(final FunctionCall filter, final List<QueryModelNode> unmatched) {
+        try {
+            filter.visit(new FilterVisitor());
+        } catch (final Exception e) {
+            LOG.error("Failed to match the filter object.", e);
+        }
+    }
+
+    private void discoverPatterns(final StatementPattern pattern, final List<QueryModelNode> unmatched) {
+        final Var subj = pattern.getSubjectVar();
+        final Var objVar = pattern.getObjectVar();
+
+        patternMap.put(subj, pattern);
+        objectPatterns.put(objVar, pattern);
+        //check for existing filters.
+        if(unmatchedFilters.containsKey(objVar)) {
+            final Collection<FunctionCall> calls = unmatchedFilters.removeAll(objVar);
+            for(final FunctionCall call : calls) {
+                addFilter(call);
+                matchedFilters.put(objVar, call);
+            }
+        }
+    }
+
+    @Override
+    public Iterator<List<EventQueryNode>> getExternalSetCombos(final QuerySegment<EventQueryNode> segment) {
+        final List<List<EventQueryNode>> comboList = new ArrayList<>();
+        comboList.add(getExternalSets(segment));
+        return comboList.iterator();
+    }
+
+    private void addFilter(final FunctionCall call) {
+        filterURI = new URIImpl(call.getURI());
+        final Var objVar = IndexingFunctionRegistry.getResultVarFromFunctionCall(filterURI, call.getArgs());
+        filterMap.put(objVar, new IndexingExpr(filterURI, objectPatterns.get(objVar), GeoParseUtils.extractArguments(objVar.getName(), call)));
+    }
+
+    /**
+     * Finds the object/function in a Filter.  If the associated statement pattern
+     * has been found, creates the {@link IndexingExpr} and adds it to the map.
+     */
+    private class FilterVisitor extends QueryModelVisitorBase<Exception> {
+        @Override
+        public void meet(final FunctionCall call) throws Exception {
+            filterURI = new URIImpl(call.getURI());
+            final FUNCTION_TYPE type = IndexingFunctionRegistry.getFunctionType(filterURI);
+            if(type == FUNCTION_TYPE.GEO || type == FUNCTION_TYPE.TEMPORAL) {
+                final Var objVar = IndexingFunctionRegistry.getResultVarFromFunctionCall(filterURI, call.getArgs());
+                if(objectPatterns.containsKey(objVar)) {
+                    filterMap.put(objVar, new IndexingExpr(filterURI, objectPatterns.get(objVar), GeoParseUtils.extractArguments(objVar.getName(), call)));
+                    matchedFilters.put(objVar, call);
+                } else {
+                    unmatchedFilters.put(objVar, call);
+                }
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/9e76b8d7/extras/rya.geoindexing/geo.common/src/main/java/org/apache/rya/indexing/geotemporal/GeoTemporalIndexer.java
----------------------------------------------------------------------
diff --git a/extras/rya.geoindexing/geo.common/src/main/java/org/apache/rya/indexing/geotemporal/GeoTemporalIndexer.java b/extras/rya.geoindexing/geo.common/src/main/java/org/apache/rya/indexing/geotemporal/GeoTemporalIndexer.java
new file mode 100644
index 0000000..106588b
--- /dev/null
+++ b/extras/rya.geoindexing/geo.common/src/main/java/org/apache/rya/indexing/geotemporal/GeoTemporalIndexer.java
@@ -0,0 +1,197 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.geotemporal;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.rya.api.persist.index.RyaSecondaryIndexer;
+import org.apache.rya.indexing.GeoConstants;
+import org.apache.rya.indexing.geotemporal.storage.EventStorage;
+import org.openrdf.model.Statement;
+import org.openrdf.model.URI;
+import org.openrdf.model.impl.URIImpl;
+
+/**
+ * A repository to store, index, and retrieve {@link Statement}s based on geotemporal features.
+ */
+public interface GeoTemporalIndexer extends RyaSecondaryIndexer {
+	/**
+	 * initialize after setting configuration.
+	 */
+    public void init();  
+
+    /**
+     * Creates the {@link Eventtorage} that will be used by the indexer.
+     *
+     * @param conf - Indicates how the {@link EventStorage} is initialized. (not null)
+     * @return The {@link EventStorage} that will be used by this indexer.
+     */
+    public abstract EventStorage getEventStorage(final Configuration conf);
+
+    /**
+     * Used to indicate which geo filter functions to use in a query.
+     */
+    public static enum GeoPolicy {
+        /**
+         * The provided geo object equals the geo object where the event took place.
+         */
+        EQUALS(GeoConstants.GEO_SF_EQUALS),
+
+        /**
+         * The provided geo object does not share any space with the event.
+         */
+        DISJOINT(GeoConstants.GEO_SF_DISJOINT),
+
+        /**
+         * The provided geo object shares some amount of space with the event.
+         */
+        INTERSECTS(GeoConstants.GEO_SF_INTERSECTS),
+
+        /**
+         * The provided geo object shares a point with the event, but only on the edge.
+         */
+        TOUCHES(GeoConstants.GEO_SF_TOUCHES),
+
+        /**
+         * The provided geo object shares some, but not all space with the event.
+         */
+        CROSSES(GeoConstants.GEO_SF_CROSSES),
+
+        /**
+         * The provided geo object exists completely within the event.
+         */
+        WITHIN(GeoConstants.GEO_SF_WITHIN),
+
+        /**
+         * The event took place completely within the provided geo object.
+         */
+        CONTAINS(GeoConstants.GEO_SF_CONTAINS),
+
+        /**
+         * The provided geo object has some but not all points in common with the event,
+         * are of the same dimension, and the intersection of the interiors has the
+         * same dimension as the geometries themselves.
+         */
+        OVERLAPS(GeoConstants.GEO_SF_OVERLAPS);
+
+        private final URI uri;
+
+        private GeoPolicy(final URI uri) {
+            this.uri = uri;
+        }
+
+        public URI getURI() {
+            return uri;
+        }
+
+        public static GeoPolicy fromURI(final URI uri) {
+            for(final GeoPolicy policy : GeoPolicy.values()) {
+                if(policy.getURI().equals(uri)) {
+                    return policy;
+                }
+            }
+            return null;
+        }
+    }
+
+    static final String TEMPORAL_NS = "tag:rya-rdf.org,2015:temporal#";
+    /**
+     * Used to indicate which temporal filter functions to use in a query.
+     */
+    public enum TemporalPolicy {
+        /**
+         * The provided instant in time equals the instant the event took place.
+         */
+        INSTANT_EQUALS_INSTANT(true, new URIImpl(TEMPORAL_NS+"equals")),
+
+        /**
+         * The provided instant in time was before when the event took place.
+         */
+        INSTANT_BEFORE_INSTANT(true, new URIImpl(TEMPORAL_NS+"before")),
+
+        /**
+         * The provided instant in time was after when the event took place.
+         */
+        INSTANT_AFTER_INSTANT(true, new URIImpl(TEMPORAL_NS+"after")),
+
+        /**
+         * The provided instant in time was before a time period.
+         */
+        INSTANT_BEFORE_INTERVAL(false, new URIImpl(TEMPORAL_NS+"beforeInterval")),
+
+        /**
+         * The provided instant in time took place within a set of time.
+         */
+        INSTANT_IN_INTERVAL(false, new URIImpl(TEMPORAL_NS+"insideInterval")),
+
+        /**
+         * The provided instant in time took place after a time period.
+         */
+        INSTANT_AFTER_INTERVAL(false, new URIImpl(TEMPORAL_NS+"afterInterval")),
+
+        /**
+         * The provided instant in time equals the start of the interval in which the event took place.
+         */
+        INSTANT_START_INTERVAL(false, new URIImpl(TEMPORAL_NS+"hasBeginningInterval")),
+
+        /**
+         * The provided instant in time equals the end of the interval in which the event took place.
+         */
+        INSTANT_END_INTERVAL(false, new URIImpl(TEMPORAL_NS+"hasEndInterval")),
+
+        /**
+         * The provided interval equals the interval in which the event took place.
+         */
+        INTERVAL_EQUALS(false, new URIImpl(TEMPORAL_NS+"intervalEquals")),
+
+        /**
+         * The provided interval is before the interval in which the event took place.
+         */
+        INTERVAL_BEFORE(false, new URIImpl(TEMPORAL_NS+"intervalBefore")),
+
+        /**
+         * The provided interval is after the interval in which the event took place.
+         */
+        INTERVAL_AFTER(false, new URIImpl(TEMPORAL_NS+"intervalAfter"));
+
+        private final boolean isInstant;
+        private final URI uri;
+
+        TemporalPolicy(final boolean isInstant, final URI uri) {
+            this.isInstant = isInstant;
+            this.uri = uri;
+        }
+
+        public boolean isInstant(){
+            return isInstant;
+        }
+
+        public URI getURI() {
+            return uri;
+        }
+
+        public static TemporalPolicy fromURI(final URI uri) {
+            for(final TemporalPolicy policy : TemporalPolicy.values()) {
+                if(policy.getURI().equals(uri)) {
+                    return policy;
+                }
+            }
+            return null;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/9e76b8d7/extras/rya.geoindexing/geo.common/src/main/java/org/apache/rya/indexing/geotemporal/GeoTemporalIndexerFactory.java
----------------------------------------------------------------------
diff --git a/extras/rya.geoindexing/geo.common/src/main/java/org/apache/rya/indexing/geotemporal/GeoTemporalIndexerFactory.java b/extras/rya.geoindexing/geo.common/src/main/java/org/apache/rya/indexing/geotemporal/GeoTemporalIndexerFactory.java
new file mode 100644
index 0000000..f4df8bc
--- /dev/null
+++ b/extras/rya.geoindexing/geo.common/src/main/java/org/apache/rya/indexing/geotemporal/GeoTemporalIndexerFactory.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.geotemporal;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.rya.indexing.GeoEnabledFilterFunctionOptimizer;
+import org.apache.rya.indexing.GeoIndexer;
+import org.apache.rya.indexing.GeoIndexerType;
+import org.apache.rya.indexing.GeoTemporalIndexerType;
+import org.apache.rya.indexing.accumulo.ConfigUtils;
+import org.apache.rya.mongodb.MongoDBRdfConfiguration;
+import org.apache.rya.mongodb.MongoSecondaryIndex;
+
+/**
+ * Factory for retrieving a {@link GeoTemporalIndexer} based on a provided {@link Configuration}.
+ */
+public class GeoTemporalIndexerFactory {
+    /**
+     * Creates and returns a {@link GeoTemporalIndexer}.
+     * @param conf - The {@link Configuration} to base the {@link GeoTemporalIndexer} on.
+     * @return The created {@link GeoTemporalIndexer}.
+     */
+    public GeoTemporalIndexer getIndexer(final Configuration conf) {
+        if(ConfigUtils.getUseMongo(conf)) {
+            final MongoDBRdfConfiguration config = new MongoDBRdfConfiguration(conf);
+            for(final MongoSecondaryIndex index : config.getAdditionalIndexers()) {
+                if(index instanceof GeoTemporalIndexer) {
+                    return (GeoTemporalIndexer) index;
+                }
+            }
+            /* Created a  MongoGeoTemporalIndexer */
+            final GeoTemporalIndexer index = GeoEnabledFilterFunctionOptimizer.instantiate(GeoTemporalIndexerType.MONGO_GEO_TEMPORAL.getGeoTemporalIndexerClassString(), GeoTemporalIndexer.class);
+            index.setConf(conf);
+            index.init();
+            return index;
+        } else {
+            //TODO: add Accumulo here.
+            return null;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/9e76b8d7/extras/rya.geoindexing/geo.common/src/main/java/org/apache/rya/indexing/geotemporal/GeoTemporalOptimizer.java
----------------------------------------------------------------------
diff --git a/extras/rya.geoindexing/geo.common/src/main/java/org/apache/rya/indexing/geotemporal/GeoTemporalOptimizer.java b/extras/rya.geoindexing/geo.common/src/main/java/org/apache/rya/indexing/geotemporal/GeoTemporalOptimizer.java
new file mode 100644
index 0000000..d626adc
--- /dev/null
+++ b/extras/rya.geoindexing/geo.common/src/main/java/org/apache/rya/indexing/geotemporal/GeoTemporalOptimizer.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.geotemporal;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.rya.indexing.external.matching.AbstractExternalSetOptimizer;
+import org.apache.rya.indexing.external.matching.ExternalSetMatcher;
+import org.apache.rya.indexing.external.matching.ExternalSetProvider;
+import org.apache.rya.indexing.external.matching.QueryNodeListRater;
+import org.apache.rya.indexing.external.matching.QuerySegment;
+import org.apache.rya.indexing.geotemporal.model.EventQueryNode;
+
+import com.google.common.base.Optional;
+
+
+public class GeoTemporalOptimizer extends AbstractExternalSetOptimizer<EventQueryNode> implements Configurable {
+    private static final GeoTemporalExternalSetMatcherFactory MATCHER_FACTORY = new GeoTemporalExternalSetMatcherFactory();
+
+    private GeoTemporalIndexer indexer;
+    private GeoTemporalIndexSetProvider provider;
+    private Configuration conf;
+
+    @Override
+    public void setConf(final Configuration conf) {
+        this.conf = conf;
+        final GeoTemporalIndexerFactory factory = new GeoTemporalIndexerFactory();
+        indexer = factory.getIndexer(conf);
+
+        //conf here does not matter since EventStorage has already been set in the indexer.
+        provider = new GeoTemporalIndexSetProvider(indexer.getEventStorage(conf));
+    }
+
+    @Override
+    public Configuration getConf() {
+        return conf;
+    }
+
+    @Override
+    protected ExternalSetMatcher<EventQueryNode> getMatcher(final QuerySegment<EventQueryNode> segment) {
+        return MATCHER_FACTORY.getMatcher(segment);
+    }
+
+    @Override
+    protected ExternalSetProvider<EventQueryNode> getProvider() {
+        return provider;
+    }
+
+    @Override
+    protected Optional<QueryNodeListRater> getNodeListRater(final QuerySegment<EventQueryNode> segment) {
+        return null;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/9e76b8d7/extras/rya.geoindexing/geo.common/src/main/java/org/apache/rya/indexing/geotemporal/GeoTemporalToSegmentConverter.java
----------------------------------------------------------------------
diff --git a/extras/rya.geoindexing/geo.common/src/main/java/org/apache/rya/indexing/geotemporal/GeoTemporalToSegmentConverter.java b/extras/rya.geoindexing/geo.common/src/main/java/org/apache/rya/indexing/geotemporal/GeoTemporalToSegmentConverter.java
new file mode 100644
index 0000000..22bfdb1
--- /dev/null
+++ b/extras/rya.geoindexing/geo.common/src/main/java/org/apache/rya/indexing/geotemporal/GeoTemporalToSegmentConverter.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.geotemporal;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.rya.indexing.external.matching.ExternalSetConverter;
+import org.apache.rya.indexing.external.matching.JoinSegment;
+import org.apache.rya.indexing.external.matching.QuerySegment;
+import org.apache.rya.indexing.geotemporal.model.EventQueryNode;
+import org.openrdf.query.algebra.Filter;
+import org.openrdf.query.algebra.QueryModelNode;
+import org.openrdf.query.algebra.ValueExpr;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Implementation of {@link ExternalSetConverter} to convert {@link EventQueryNode}s
+ * to {@link QuerySegment}s.
+ *
+ */
+public class GeoTemporalToSegmentConverter implements ExternalSetConverter<EventQueryNode> {
+    @Override
+    public QuerySegment<EventQueryNode> setToSegment(final EventQueryNode set) {
+        Preconditions.checkNotNull(set);
+        final Set<QueryModelNode> matched = new HashSet<>(set.getPatterns());
+        matched.addAll(set.getFilters());
+        final List<QueryModelNode> unmatched = new ArrayList<>(set.getPatterns());
+        return new JoinSegment<EventQueryNode>(matched, unmatched, new HashMap<ValueExpr, Filter>());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/9e76b8d7/extras/rya.geoindexing/geo.common/src/main/java/org/apache/rya/indexing/geotemporal/model/Event.java
----------------------------------------------------------------------
diff --git a/extras/rya.geoindexing/geo.common/src/main/java/org/apache/rya/indexing/geotemporal/model/Event.java b/extras/rya.geoindexing/geo.common/src/main/java/org/apache/rya/indexing/geotemporal/model/Event.java
new file mode 100644
index 0000000..4c50bfb
--- /dev/null
+++ b/extras/rya.geoindexing/geo.common/src/main/java/org/apache/rya/indexing/geotemporal/model/Event.java
@@ -0,0 +1,218 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.geotemporal.model;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Objects;
+import java.util.Optional;
+
+import org.apache.rya.api.domain.RyaURI;
+import org.apache.rya.indexing.TemporalInstant;
+import org.apache.rya.indexing.TemporalInterval;
+import org.apache.rya.indexing.geotemporal.GeoTemporalIndexer;
+
+import com.vividsolutions.jts.geom.Geometry;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * Query object for a {@link GeoTemporalIndexer}.
+ * Defines a {@link Geometry}, either a {@link TemporalInstant} or
+ * {@link TemporalInterval}, and a triple Subject.
+ */
+public class Event {
+    private final Optional<Geometry> geometry;
+    private final Optional<TemporalInstant> instant;
+    private final Optional<TemporalInterval> interval;
+    private final RyaURI subject;
+
+    private final boolean isInstant;
+
+    /**
+     * Creates a new {@link Event} query object with a {@link TemporalInstant}.
+     * @param geo - The {@link Geometry} to use when querying.
+     * @param instant - The {@link TemporalInstant} to use when querying.
+     * @param subject - The Subject that both statements must have when querying.
+     */
+    private Event(final Geometry geo, final TemporalInstant instant, final RyaURI subject) {
+        this.subject = requireNonNull(subject);
+
+        //these fields are nullable since they are filled field by field.
+        this.instant = Optional.ofNullable(instant);
+        geometry = Optional.ofNullable(geo);
+        isInstant = true;
+        interval = Optional.empty();
+    }
+
+    /**
+     * Creates a new {@link Event} query object with a {@link TemporalInterval}.
+     * @param geo - The {@link Geometry} to use when querying.
+     * @param interval - The {@link TemporalInterval} to use when querying.
+     * @param subject - The Subject that both statements must have when querying.
+     */
+    private Event(final Geometry geo, final TemporalInterval interval, final RyaURI subject) {
+        this.subject = requireNonNull(subject);
+
+        //these fields are nullable since they are filled field by field.
+        this.interval = Optional.ofNullable(interval);
+        geometry = Optional.ofNullable(geo);
+        isInstant = false;
+        instant = Optional.empty();
+    }
+
+    /**
+     * @return Whether or not the query object uses a {@link TemporalInstant}.
+     */
+    public boolean isInstant() {
+        return isInstant;
+    }
+
+    /**
+     * @return The {@link Geometry} to use when querying.
+     */
+    public Optional<Geometry> getGeometry() {
+        return geometry;
+    }
+
+    /**
+     * @return The {@link TemporalInstant} to use when querying.
+     */
+    public Optional<TemporalInstant> getInstant() {
+        return instant;
+    }
+
+    /**
+     * @return The {@link TemporalInterval} to use when querying.
+     */
+    public Optional<TemporalInterval> getInterval() {
+        return interval;
+    }
+
+    /**
+     * @return The statement subject.
+     */
+    public RyaURI getSubject() {
+        return subject;
+    }
+
+    @Override
+    public int hashCode() {
+        if(isInstant) {
+            return Objects.hash(subject, geometry, instant);
+        } else {
+            return Objects.hash(subject, geometry, interval);
+        }
+    }
+
+    @Override
+    public boolean equals(final Object o) {
+        if(this == o) {
+            return true;
+        }
+        if(o instanceof Event) {
+            final Event event = (Event) o;
+            return Objects.equals(subject, event.subject) &&
+                    Objects.equals(isInstant, event.isInstant) &&
+                    (isInstant ? Objects.equals(instant, event.instant) : Objects.equals(interval, event.interval));
+        }
+        return false;
+    }
+
+    public static Builder builder(final Event event) {
+        final Builder builder = new Builder()
+            .setSubject(event.getSubject());
+        if(event.getGeometry().isPresent()) {
+            builder.setGeometry(event.getGeometry().get());
+        }
+        if(event.isInstant()) {
+            if(event.getInstant().isPresent()) {
+                builder.setTemporalInstant(event.getInstant().get());
+            }
+        } else {
+            if(event.getInterval().isPresent()) {
+                builder.setTemporalInterval(event.getInterval().get());
+            }
+        }
+        return builder;
+    }
+
+    public static Builder builder() {
+        return new Builder();
+    }
+
+    /**
+     * Builds instances of {@link Event}.
+     */
+    @DefaultAnnotation(NonNull.class)
+    public static class Builder {
+        private RyaURI subject;
+        private Geometry geo;
+        private TemporalInstant instant;
+        private TemporalInterval interval;
+
+        /**
+         * Sets the {@link RyaURI} subject.
+         * @param subject - The subject to key on the event.
+         */
+        public Builder setSubject(final RyaURI subject) {
+            this.subject = subject;
+            return this;
+        }
+
+        /**
+         * Sets the {@link Geometry}.
+         * @param geo - The geometry.
+         */
+        public Builder setGeometry(final Geometry geo) {
+            this.geo = geo;
+            return this;
+        }
+
+        /**
+         * Sets the {@link TemporalInterval}.
+         * @param interval - The interval.
+         */
+        public Builder setTemporalInterval(final TemporalInterval interval) {
+            this.interval = interval;
+            return this;
+        }
+
+        /**
+         * Sets the {@link TemporalInstant}.
+         * @param instant - The instant.
+         */
+        public Builder setTemporalInstant(final TemporalInstant instant) {
+            this.instant = instant;
+            return this;
+        }
+
+        /**
+         * @return The new {@link Event}.
+         */
+        public Event build() {
+            if(instant == null) {
+                return new Event(geo, interval, subject);
+            } else {
+                return new Event(geo, instant, subject);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/9e76b8d7/extras/rya.geoindexing/geo.common/src/main/java/org/apache/rya/indexing/geotemporal/model/EventQueryNode.java
----------------------------------------------------------------------
diff --git a/extras/rya.geoindexing/geo.common/src/main/java/org/apache/rya/indexing/geotemporal/model/EventQueryNode.java b/extras/rya.geoindexing/geo.common/src/main/java/org/apache/rya/indexing/geotemporal/model/EventQueryNode.java
new file mode 100644
index 0000000..104fca8
--- /dev/null
+++ b/extras/rya.geoindexing/geo.common/src/main/java/org/apache/rya/indexing/geotemporal/model/EventQueryNode.java
@@ -0,0 +1,372 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.geotemporal.model;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+
+import org.apache.commons.lang.builder.EqualsBuilder;
+import org.apache.rya.api.domain.RyaURI;
+import org.apache.rya.indexing.IndexingExpr;
+import org.apache.rya.indexing.TemporalInstant;
+import org.apache.rya.indexing.TemporalInstantRfc3339;
+import org.apache.rya.indexing.entity.query.EntityQueryNode;
+import org.apache.rya.indexing.geotemporal.storage.EventStorage;
+import org.apache.rya.indexing.mongodb.update.RyaObjectStorage.ObjectStorageException;
+import org.apache.rya.rdftriplestore.evaluation.ExternalBatchingIterator;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+import org.openrdf.model.Value;
+import org.openrdf.model.impl.ValueFactoryImpl;
+import org.openrdf.query.BindingSet;
+import org.openrdf.query.QueryEvaluationException;
+import org.openrdf.query.algebra.FunctionCall;
+import org.openrdf.query.algebra.StatementPattern;
+import org.openrdf.query.algebra.Var;
+import org.openrdf.query.algebra.evaluation.impl.ExternalSet;
+import org.openrdf.query.algebra.evaluation.iterator.CollectionIteration;
+import org.openrdf.query.impl.MapBindingSet;
+
+import com.vividsolutions.jts.geom.Geometry;
+
+import info.aduna.iteration.CloseableIteration;
+
+public class EventQueryNode extends ExternalSet implements ExternalBatchingIterator {
+    private final Collection<FunctionCall> usedFilters;
+    private final Collection<IndexingExpr> geoFilters;
+    private final Collection<IndexingExpr> temporalFilters;
+
+    private final StatementPattern geoPattern;
+    private final StatementPattern temporalPattern;
+
+    //Information about the subject of the patterns.
+    private final boolean subjectIsConstant;
+    private final Optional<String> subjectVar;
+    //not final because if the subject is a variable and the evaluate() is
+    //  provided a binding set that contains the subject, this optional is used.
+    private Optional<String> subjectConstant;
+
+    //since and EventQueryNode exists in a single segment, all binding names are garunteed to be assured.
+    private final Set<String> bindingNames;
+
+    private Collection<StatementPattern> patterns;
+
+    private final EventStorage eventStore;
+
+    /**
+     * Constructs an instance of {@link EventQueryNode}.
+     * @param usedFilters
+     *
+     * @param type - The type of {@link Event} this node matches. (not null)
+     * @param patterns - The query StatementPatterns that are solved using an
+     *   Event of the Type. (not null)
+     * @param entities - The {@link EventStorage} that will be searched to match
+     *   {@link BindingSet}s when evaluating a query. (not null)
+     */
+    private EventQueryNode(final EventStorage eventStore, final StatementPattern geoPattern, final StatementPattern temporalPattern, final Collection<IndexingExpr> geoFilters, final Collection<IndexingExpr> temporalFilters, final Collection<FunctionCall> usedFilters) throws IllegalStateException {
+        this.geoPattern = requireNonNull(geoPattern);
+        this.temporalPattern = requireNonNull(temporalPattern);
+        this.geoFilters = requireNonNull(geoFilters);
+        this.temporalFilters = requireNonNull(temporalFilters);
+        this.eventStore = requireNonNull(eventStore);
+        this.usedFilters = requireNonNull(usedFilters);
+        bindingNames = new HashSet<>();
+
+        // Subject based preconditions.
+        verifySameSubjects(getPatterns());
+        // Predicate based preconditions.
+        verifyAllPredicatesAreConstants(getPatterns());
+
+        // The Subject may either be constant or a variable.
+        final Var subject = patterns.iterator().next().getSubjectVar();
+        subjectIsConstant = subject.isConstant();
+        if(subjectIsConstant) {
+            subjectConstant = Optional.of( subject.getValue().toString() );
+            subjectVar = Optional.empty();
+        } else {
+            subjectConstant = Optional.empty();
+            subjectVar = Optional.of( subject.getName() );
+        }
+    }
+
+    @Override
+    public Set<String> getBindingNames() {
+        return bindingNames;
+    }
+
+    @Override
+    public Set<String> getAssuredBindingNames() {
+        return bindingNames;
+    }
+
+    /**
+     * Verify the Subject for all of the patterns is the same.
+     *
+     * @param patterns - The patterns to check.
+     * @throws IllegalStateException If all of the Subjects are not the same.
+     */
+    private static void verifySameSubjects(final Collection<StatementPattern> patterns) throws IllegalStateException {
+        requireNonNull(patterns);
+
+        final Iterator<StatementPattern> it = patterns.iterator();
+        final Var subject = it.next().getSubjectVar();
+
+        while(it.hasNext()) {
+            final StatementPattern pattern = it.next();
+            if(!pattern.getSubjectVar().equals(subject)) {
+                throw new IllegalStateException("At least one of the patterns has a different subject from the others. " +
+                        "All subjects must be the same.");
+            }
+        }
+    }
+
+    /**
+     * Verifies all of the Statement Patterns have Constants for their predicates.
+     *
+     * @param patterns - The patterns to check. (not null)
+     * @throws IllegalStateException A pattern has a variable predicate.
+     */
+    private static void verifyAllPredicatesAreConstants(final Collection<StatementPattern> patterns) throws IllegalStateException {
+        requireNonNull(patterns);
+
+        for(final StatementPattern pattern : patterns) {
+            if(!pattern.getPredicateVar().isConstant()) {
+                throw new IllegalStateException("The Predicate of a Statement Pattern must be constant. Pattern: " + pattern);
+            }
+        }
+    }
+
+    @Override
+    public CloseableIteration<BindingSet, QueryEvaluationException> evaluate(final BindingSet bindings) throws QueryEvaluationException {
+        final List<BindingSet> list = new ArrayList<>();
+        try {
+            final Collection<Event> searchEvents;
+            final String subj;
+            //if the provided binding set has the subject already, set it to the constant subject.
+            if(!subjectConstant.isPresent() && bindings.hasBinding(subjectVar.get())) {
+                subjectConstant = Optional.of(bindings.getValue(subjectVar.get()).stringValue());
+            } else if(bindings.size() != 0) {
+                list.add(bindings);
+            }
+
+            // If the subject needs to be filled in, check if the subject variable is in the binding set.
+            if(subjectConstant.isPresent()) {
+                // if it is, fetch that value and then fetch the entity for the subject.
+                subj = subjectConstant.get();
+                searchEvents = eventStore.search(Optional.of(new RyaURI(subj)), Optional.of(geoFilters), Optional.of(temporalFilters));
+            } else {
+                searchEvents = eventStore.search(Optional.empty(), Optional.of(geoFilters), Optional.of(temporalFilters));
+            }
+
+            for(final Event event : searchEvents) {
+                final MapBindingSet resultSet = new MapBindingSet();
+                if(event.getGeometry().isPresent()) {
+                    final Geometry geo = event.getGeometry().get();
+                    final Value geoValue = ValueFactoryImpl.getInstance().createLiteral(geo.toText());
+                    final Var geoObj = geoPattern.getObjectVar();
+                    resultSet.addBinding(geoObj.getName(), geoValue);
+                }
+
+                final Value temporalValue;
+                if(event.isInstant() && event.getInstant().isPresent()) {
+                    final Optional<TemporalInstant> opt = event.getInstant();
+                    DateTime dt = opt.get().getAsDateTime();
+                    dt = dt.toDateTime(DateTimeZone.UTC);
+                    final String str = dt.toString(TemporalInstantRfc3339.FORMATTER);
+                    temporalValue = ValueFactoryImpl.getInstance().createLiteral(str);
+                } else if(event.getInterval().isPresent()) {
+                    temporalValue = ValueFactoryImpl.getInstance().createLiteral(event.getInterval().get().getAsPair());
+                } else {
+                    temporalValue = null;
+                }
+
+                if(temporalValue != null) {
+                    final Var temporalObj = temporalPattern.getObjectVar();
+                    resultSet.addBinding(temporalObj.getName(), temporalValue);
+                }
+                list.add(resultSet);
+            }
+        } catch (final ObjectStorageException e) {
+            throw new QueryEvaluationException("Failed to evaluate the binding set", e);
+        }
+        return new CollectionIteration<>(list);
+    }
+
+    public Collection<IndexingExpr> getGeoFilters() {
+        return geoFilters;
+    }
+
+    public Collection<IndexingExpr> getTemporalFilters() {
+        return temporalFilters;
+    }
+
+    public Collection<FunctionCall> getFilters() {
+        return usedFilters;
+    }
+
+    public Collection<StatementPattern> getPatterns() {
+        if(patterns == null) {
+            patterns = new ArrayList<>();
+            patterns.add(geoPattern);
+            patterns.add(temporalPattern);
+        }
+        return patterns;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(subjectIsConstant,
+                subjectVar,
+                geoFilters,
+                temporalFilters,
+                geoPattern,
+                temporalPattern,
+                bindingNames,
+                eventStore);
+    }
+
+    @Override
+    public boolean equals(final Object other) {
+        if(other instanceof EventQueryNode) {
+            final EventQueryNode otherNode = (EventQueryNode)other;
+            return new EqualsBuilder()
+                .append(subjectIsConstant, otherNode.subjectIsConstant)
+                .append(subjectVar, otherNode.subjectVar)
+                .append(geoFilters, otherNode.geoFilters)
+                .append(geoPattern, otherNode.geoPattern)
+                .append(temporalFilters, otherNode.temporalFilters)
+                .append(temporalPattern, otherNode.temporalPattern)
+                .append(bindingNames, otherNode.bindingNames)
+                .append(subjectConstant, otherNode.subjectConstant)
+                .isEquals();
+        }
+        return false;
+    }
+
+    @Override
+    public EventQueryNode clone() {
+        return new EventQueryNode(eventStore, geoPattern, temporalPattern, geoFilters, temporalFilters, usedFilters);
+    }
+
+    @Override
+    public String toString() {
+        final StringBuilder sb = new StringBuilder();
+        sb.append("Geo Pattern: " + geoPattern.toString());
+        sb.append("\n--Geo Filters--\n");
+        for(final IndexingExpr filter : geoFilters) {
+            sb.append(filter.toString());
+            sb.append("\n");
+        }
+        sb.append("\n-------------------\n");
+        sb.append("Temporal Pattern: " + temporalPattern.toString());
+        sb.append("\n--Temporal Filters--\n");
+        for(final IndexingExpr filter : temporalFilters) {
+            sb.append(filter.toString());
+            sb.append("\n");
+        }
+        return sb.toString();
+    }
+
+    @Override
+    public CloseableIteration<BindingSet, QueryEvaluationException> evaluate(final Collection<BindingSet> bindingset)
+            throws QueryEvaluationException {
+        return null;
+    }
+
+    /**
+     * Builder for {@link EventQueryNode}s.
+     */
+    public static class EventQueryNodeBuilder {
+        private EventStorage store;
+        private StatementPattern geoPattern;
+        private StatementPattern temporalPattern;
+        private Collection<IndexingExpr> geoFilters;
+        private Collection<IndexingExpr> temporalFilters;
+        private Collection<FunctionCall> usedFilters;
+
+        /**
+         * @param store - The {@link EventStorage} to use in the {@link EntityQueryNode}
+         * @return - The Builder.
+         */
+        public EventQueryNodeBuilder setStorage(final EventStorage store) {
+            this.store = store;
+            return this;
+        }
+
+        /**
+         * @param geoPattern - The geo {@link StatementPattern} to use in the {@link EntityQueryNode}
+         * @return - The Builder.
+         */
+        public EventQueryNodeBuilder setGeoPattern(final StatementPattern geoPattern) {
+            this.geoPattern = geoPattern;
+            return this;
+        }
+
+        /**
+         * @param temporalPattern - The temporal {@link StatementPattern} to use in the {@link EntityQueryNode}
+         * @return - The Builder.
+         */
+        public EventQueryNodeBuilder setTemporalPattern(final StatementPattern temporalPattern) {
+            this.temporalPattern = temporalPattern;
+            return this;
+        }
+
+        /**
+         * @param geoFilters - The geo filter(s) {@link IndexingExpr} to use in the {@link EntityQueryNode}
+         * @return - The Builder.
+         */
+        public EventQueryNodeBuilder setGeoFilters(final Collection<IndexingExpr> geoFilters) {
+            this.geoFilters = geoFilters;
+            return this;
+        }
+
+        /**
+         * @param temporalFilters - The temporal filter(s) {@link IndexingExpr} to use in the {@link EntityQueryNode}
+         * @return - The Builder.
+         */
+        public EventQueryNodeBuilder setTemporalFilters(final Collection<IndexingExpr> temporalFilters) {
+            this.temporalFilters = temporalFilters;
+            return this;
+        }
+
+        /**
+         * @param usedFilters - The filter(s) used by the {@link EntityQueryNode}
+         * @return - The Builder.
+         */
+        public EventQueryNodeBuilder setUsedFilters(final Collection<FunctionCall> usedFilters) {
+            this.usedFilters = usedFilters;
+            return this;
+        }
+
+        /**
+         * @return The {@link EntityQueryNode} built by the builder.
+         */
+        public EventQueryNode build() {
+            return new EventQueryNode(store, geoPattern, temporalPattern, geoFilters, temporalFilters, usedFilters);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/9e76b8d7/extras/rya.geoindexing/geo.common/src/main/java/org/apache/rya/indexing/geotemporal/storage/EventStorage.java
----------------------------------------------------------------------
diff --git a/extras/rya.geoindexing/geo.common/src/main/java/org/apache/rya/indexing/geotemporal/storage/EventStorage.java b/extras/rya.geoindexing/geo.common/src/main/java/org/apache/rya/indexing/geotemporal/storage/EventStorage.java
new file mode 100644
index 0000000..47c18a0
--- /dev/null
+++ b/extras/rya.geoindexing/geo.common/src/main/java/org/apache/rya/indexing/geotemporal/storage/EventStorage.java
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.geotemporal.storage;
+
+import java.util.Collection;
+import java.util.Optional;
+
+import org.apache.rya.api.domain.RyaURI;
+import org.apache.rya.indexing.IndexingExpr;
+import org.apache.rya.indexing.geotemporal.GeoTemporalIndexer;
+import org.apache.rya.indexing.geotemporal.model.Event;
+import org.apache.rya.indexing.mongodb.update.RyaObjectStorage;
+
+public interface EventStorage extends RyaObjectStorage<Event> {
+    /**
+     * Search for {@link Event}s from the storage by its subject.
+     * Will query based on present parameters.
+     *
+     * @param subject - The subject key to find events.
+     * @param geoFilters - The geo filters to find Events.
+     * @param temporalFilters - The temporal filters to find Events.
+     * @return The {@link Event}, if one exists for the subject.
+     * @throws ObjectStorageException A problem occurred while fetching the Entity from the storage.
+     */
+    public Collection<Event> search(final Optional<RyaURI> subject, Optional<Collection<IndexingExpr>> geoFilters, Optional<Collection<IndexingExpr>> temporalFilters) throws ObjectStorageException;
+
+    /**
+     * Indicates a problem while interacting with an {@link EventStorage}.
+     */
+    public static class EventStorageException extends ObjectStorageException {
+        private static final long serialVersionUID = 1L;
+
+        /**
+         * Constructs a new exception with the specified detail message.  The
+         * cause is not initialized, and may subsequently be initialized by
+         * a call to {@link #initCause}.
+         *
+         * @param   message   the detail message. The detail message is saved for
+         *          later retrieval by the {@link #getMessage()} method.
+         */
+        public EventStorageException(final String message) {
+            super(message);
+        }
+
+        /**
+         * Constructs a new exception with the specified detail message and
+         * cause.  <p>Note that the detail message associated with
+         * {@code cause} is <i>not</i> automatically incorporated in
+         * this exception's detail message.
+         *
+         * @param  message the detail message (which is saved for later retrieval
+         *         by the {@link #getMessage()} method).
+         * @param  cause the cause (which is saved for later retrieval by the
+         *         {@link #getCause()} method).  (A <tt>null</tt> value is
+         *         permitted, and indicates that the cause is nonexistent or
+         *         unknown.)
+         */
+        public EventStorageException(final String message, final Throwable cause) {
+            super(message, cause);
+        }
+    }
+
+    /**
+     * An {@link Event} could not be created because one already exists for the Subject.
+     */
+    public static class EventAlreadyExistsException extends EventStorageException {
+        private static final long serialVersionUID = 1L;
+
+        public EventAlreadyExistsException(final String message) {
+            super(message);
+        }
+
+        public EventAlreadyExistsException(final String message, final Throwable cause) {
+            super(message, cause);
+        }
+    }
+
+    /**
+     * An {@link TypedEvent} could not be updated because the old state does not
+     * match the current state.
+     */
+    public static class StaleUpdateException extends EventStorageException {
+        private static final long serialVersionUID = 1L;
+
+        public StaleUpdateException(final String message) {
+            super(message);
+        }
+
+        public StaleUpdateException(final String message, final Throwable cause) {
+            super(message, cause);
+        }
+    }
+
+    /**
+     *  A {@link EventFilter} is a translation from an {@link IndexingExpr}
+     *  to a format the {@link GeoTemporalIndexer} can use to easily determine which
+     *  filter function is being used.
+     *
+     *   @param T - The type of
+     */
+    interface EventFilter<T> {
+        /**
+         * Gets the translated query friendly form of the filter.
+         */
+        public T getQueryObject();
+    }
+
+    /**
+     * Factory for getting the {@link EventFilter} from an {@link IndexingExpr}.
+     */
+    interface EventFilterFactory<T> {
+        public EventFilter<T> getSearchFunction(final IndexingExpr filter);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/9e76b8d7/extras/rya.geoindexing/geo.geomesa/pom.xml
----------------------------------------------------------------------
diff --git a/extras/rya.geoindexing/geo.geomesa/pom.xml b/extras/rya.geoindexing/geo.geomesa/pom.xml
new file mode 100644
index 0000000..ebadd36
--- /dev/null
+++ b/extras/rya.geoindexing/geo.geomesa/pom.xml
@@ -0,0 +1,51 @@
+<?xml version='1.0'?>
+
+<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor 
+	license agreements. See the NOTICE file distributed with this work for additional 
+	information regarding copyright ownership. The ASF licenses this file to 
+	you under the Apache License, Version 2.0 (the "License"); you may not use 
+	this file except in compliance with the License. You may obtain a copy of 
+	the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required 
+	by applicable law or agreed to in writing, software distributed under the 
+	License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS 
+	OF ANY KIND, either express or implied. See the License for the specific 
+	language governing permissions and limitations under the License. -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+	<modelVersion>4.0.0</modelVersion>
+	<parent>
+		<groupId>org.apache.rya</groupId>
+		<artifactId>rya.geoindexing</artifactId>
+		<version>3.2.11-incubating-SNAPSHOT</version>
+	</parent>
+	<artifactId>geo.geomesa</artifactId>
+	<name>Apache Rya Geo Indexing using GeoMesa</name>
+	<properties>
+		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+		<geotools.version>14.3</geotools.version>
+	</properties>
+
+	<dependencies>
+		<dependency>
+			<groupId>org.apache.rya</groupId>
+			<artifactId>geo.common</artifactId>
+			<version>3.2.11-incubating-SNAPSHOT</version>
+		</dependency>
+
+		<dependency>
+			<groupId>org.locationtech.geomesa</groupId>
+			<artifactId>geomesa-accumulo-datastore_2.11</artifactId>
+		</dependency>
+		<dependency>
+			<groupId>org.geotools.xsd</groupId>
+			<artifactId>gt-xsd-gml3</artifactId>
+			<version>${geotools.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>org.geotools</groupId>
+			<artifactId>gt-api</artifactId>
+			<version>${geotools.version}</version>
+		</dependency>
+	</dependencies>
+	</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/9e76b8d7/extras/rya.geoindexing/geo.geomesa/src/main/java/org/apache/rya/indexing/accumulo/geo/GeoMesaGeoIndexer.java
----------------------------------------------------------------------
diff --git a/extras/rya.geoindexing/geo.geomesa/src/main/java/org/apache/rya/indexing/accumulo/geo/GeoMesaGeoIndexer.java b/extras/rya.geoindexing/geo.geomesa/src/main/java/org/apache/rya/indexing/accumulo/geo/GeoMesaGeoIndexer.java
new file mode 100644
index 0000000..02ef5ba
--- /dev/null
+++ b/extras/rya.geoindexing/geo.geomesa/src/main/java/org/apache/rya/indexing/accumulo/geo/GeoMesaGeoIndexer.java
@@ -0,0 +1,519 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.accumulo.geo;
+
+import static java.util.Objects.requireNonNull;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.mock.MockInstance;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.log4j.Logger;
+import org.apache.rya.accumulo.experimental.AbstractAccumuloIndexer;
+import org.apache.rya.api.RdfCloudTripleStoreConfiguration;
+import org.apache.rya.api.domain.RyaStatement;
+import org.apache.rya.api.resolver.RyaToRdfConversions;
+import org.apache.rya.indexing.GeoIndexer;
+import org.apache.rya.indexing.Md5Hash;
+import org.apache.rya.indexing.StatementConstraints;
+import org.apache.rya.indexing.StatementSerializer;
+import org.apache.rya.indexing.accumulo.ConfigUtils;
+import org.apache.rya.indexing.accumulo.geo.GeoTupleSet.GeoSearchFunctionFactory.NearQuery;
+import org.geotools.data.DataStore;
+import org.geotools.data.DataStoreFinder;
+import org.geotools.data.DataUtilities;
+import org.geotools.data.FeatureSource;
+import org.geotools.data.FeatureStore;
+import org.geotools.data.Query;
+import org.geotools.factory.CommonFactoryFinder;
+import org.geotools.factory.Hints;
+import org.geotools.feature.DefaultFeatureCollection;
+import org.geotools.feature.FeatureIterator;
+import org.geotools.feature.SchemaException;
+import org.geotools.feature.simple.SimpleFeatureBuilder;
+import org.geotools.filter.text.cql2.CQLException;
+import org.geotools.filter.text.ecql.ECQL;
+import org.locationtech.geomesa.accumulo.index.Constants;
+import org.locationtech.geomesa.utils.geotools.SimpleFeatureTypes;
+import org.opengis.feature.simple.SimpleFeature;
+import org.opengis.feature.simple.SimpleFeatureType;
+import org.opengis.filter.Filter;
+import org.opengis.filter.FilterFactory;
+import org.opengis.filter.identity.Identifier;
+import org.openrdf.model.Literal;
+import org.openrdf.model.Statement;
+import org.openrdf.model.URI;
+import org.openrdf.query.QueryEvaluationException;
+
+import com.vividsolutions.jts.geom.Geometry;
+import com.vividsolutions.jts.io.ParseException;
+
+import info.aduna.iteration.CloseableIteration;
+ 
+/**
+ * A {@link GeoIndexer} wrapper around a GeoMesa {@link AccumuloDataStore}. This class configures and connects to the Datastore, creates the
+ * RDF Feature Type, and interacts with the Datastore.
+ * <p>
+ * Specifically, this class creates a RDF Feature type and stores each RDF Statement as a RDF Feature in the datastore. Each feature
+ * contains the standard set of GeoMesa attributes (Geometry, Start Date, and End Date). The GeoMesaGeoIndexer populates the Geometry
+ * attribute by parsing the Well-Known Text contained in the RDF Statement’s object literal value.
+ * <p>
+ * The RDF Feature contains four additional attributes for each component of the RDF Statement. These attributes are:
+ * <p>
+ * <table border="1">
+ * <tr>
+ * <th>Name</th>
+ * <th>Symbol</th>
+ * <th>Type</th>
+ * </tr>
+ * <tr>
+ * <td>Subject Attribute</td>
+ * <td>S</td>
+ * <td>String</td>
+ * </tr>
+ * </tr>
+ * <tr>
+ * <td>Predicate Attribute</td>
+ * <td>P</td>
+ * <td>String</td>
+ * </tr>
+ * </tr>
+ * <tr>
+ * <td>Object Attribute</td>
+ * <td>O</td>
+ * <td>String</td>
+ * </tr>
+ * </tr>
+ * <tr>
+ * <td>Context Attribute</td>
+ * <td>C</td>
+ * <td>String</td>
+ * </tr>
+ * </table>
+ */
+public class GeoMesaGeoIndexer extends AbstractAccumuloIndexer implements GeoIndexer  {
+
+    private static final String TABLE_SUFFIX = "geo";
+
+    private static final Logger logger = Logger.getLogger(GeoMesaGeoIndexer.class);
+
+    private static final String FEATURE_NAME = "RDF";
+
+    private static final String SUBJECT_ATTRIBUTE = "S";
+    private static final String PREDICATE_ATTRIBUTE = "P";
+    private static final String OBJECT_ATTRIBUTE = "O";
+    private static final String CONTEXT_ATTRIBUTE = "C";
+    private static final String GEOMETRY_ATTRIBUTE = Constants.SF_PROPERTY_GEOMETRY;
+
+    private Set<URI> validPredicates;
+    private Configuration conf;
+    private FeatureStore<SimpleFeatureType, SimpleFeature> featureStore;
+    private FeatureSource<SimpleFeatureType, SimpleFeature> featureSource;
+    private SimpleFeatureType featureType;
+    private boolean isInit = false;
+
+    //initialization occurs in setConf because index is created using reflection
+    @Override
+    public void setConf(final Configuration conf) {
+        this.conf = conf;
+        if (!isInit) {
+            try {
+                initInternal();
+                isInit = true;
+            } catch (final IOException e) {
+                logger.warn("Unable to initialize index.  Throwing Runtime Exception. ", e);
+                throw new RuntimeException(e);
+            }
+        }
+    }
+
+    @Override
+    public Configuration getConf() {
+        return conf;
+    }
+
+
+    private void initInternal() throws IOException {
+        validPredicates = ConfigUtils.getGeoPredicates(conf);
+
+        final DataStore dataStore = createDataStore(conf);
+
+        try {
+            featureType = getStatementFeatureType(dataStore);
+        } catch (final IOException | SchemaException e) {
+            throw new IOException(e);
+        }
+
+        featureSource = dataStore.getFeatureSource(featureType.getName());
+        if (!(featureSource instanceof FeatureStore)) {
+            throw new IllegalStateException("Could not retrieve feature store");
+        }
+        featureStore = (FeatureStore<SimpleFeatureType, SimpleFeature>) featureSource;
+    }
+
+    private static DataStore createDataStore(final Configuration conf) throws IOException {
+        // get the configuration parameters
+        final Instance instance = ConfigUtils.getInstance(conf);
+        final boolean useMock = instance instanceof MockInstance;
+        final String instanceId = instance.getInstanceName();
+        final String zookeepers = instance.getZooKeepers();
+        final String user = ConfigUtils.getUsername(conf);
+        final String password = ConfigUtils.getPassword(conf);
+        final String auths = ConfigUtils.getAuthorizations(conf).toString();
+        final String tableName = getTableName(conf);
+        final int numParitions = OptionalConfigUtils.getGeoNumPartitions(conf);
+
+        final String featureSchemaFormat = "%~#s%" + numParitions + "#r%" + FEATURE_NAME
+                + "#cstr%0,3#gh%yyyyMMdd#d::%~#s%3,2#gh::%~#s%#id";
+        // build the map of parameters
+        final Map<String, Serializable> params = new HashMap<String, Serializable>();
+        params.put("instanceId", instanceId);
+        params.put("zookeepers", zookeepers);
+        params.put("user", user);
+        params.put("password", password);
+        params.put("auths", auths);
+        params.put("tableName", tableName);
+        params.put("indexSchemaFormat", featureSchemaFormat);
+        params.put("useMock", Boolean.toString(useMock));
+
+        // fetch the data store from the finder
+        return DataStoreFinder.getDataStore(params);
+    }
+
+    private static SimpleFeatureType getStatementFeatureType(final DataStore dataStore) throws IOException, SchemaException {
+        SimpleFeatureType featureType;
+
+        final String[] datastoreFeatures = dataStore.getTypeNames();
+        if (Arrays.asList(datastoreFeatures).contains(FEATURE_NAME)) {
+            featureType = dataStore.getSchema(FEATURE_NAME);
+        } else {
+            final String featureSchema = SUBJECT_ATTRIBUTE + ":String," //
+                    + PREDICATE_ATTRIBUTE + ":String," //
+                    + OBJECT_ATTRIBUTE + ":String," //
+                    + CONTEXT_ATTRIBUTE + ":String," //
+                    + GEOMETRY_ATTRIBUTE + ":Geometry:srid=4326;geomesa.mixed.geometries='true'";
+            featureType = SimpleFeatureTypes.createType(FEATURE_NAME, featureSchema);
+            dataStore.createSchema(featureType);
+        }
+        return featureType;
+    }
+
+    @Override
+    public void storeStatements(final Collection<RyaStatement> ryaStatements) throws IOException {
+        // create a feature collection
+        final DefaultFeatureCollection featureCollection = new DefaultFeatureCollection();
+        for (final RyaStatement ryaStatement : ryaStatements) {
+            final Statement statement = RyaToRdfConversions.convertStatement(ryaStatement);
+            // if the predicate list is empty, accept all predicates.
+            // Otherwise, make sure the predicate is on the "valid" list
+            final boolean isValidPredicate = validPredicates.isEmpty() || validPredicates.contains(statement.getPredicate());
+
+            if (isValidPredicate && (statement.getObject() instanceof Literal)) {
+                try {
+                    final SimpleFeature feature = createFeature(featureType, statement);
+                    featureCollection.add(feature);
+                } catch (final ParseException e) {
+                    logger.warn("Error getting geo from statement: " + statement.toString(), e);
+                }
+            }
+        }
+
+        // write this feature collection to the store
+        if (!featureCollection.isEmpty()) {
+            featureStore.addFeatures(featureCollection);
+        }
+    }
+
+    @Override
+    public void storeStatement(final RyaStatement statement) throws IOException {
+        storeStatements(Collections.singleton(statement));
+    }
+
+    private static SimpleFeature createFeature(final SimpleFeatureType featureType, final Statement statement) throws ParseException {
+        final String subject = StatementSerializer.writeSubject(statement);
+        final String predicate = StatementSerializer.writePredicate(statement);
+        final String object = StatementSerializer.writeObject(statement);
+        final String context = StatementSerializer.writeContext(statement);
+
+        // create the feature
+        final Object[] noValues = {};
+
+        // create the hash
+        final String statementId = Md5Hash.md5Base64(StatementSerializer.writeStatement(statement));
+        final SimpleFeature newFeature = SimpleFeatureBuilder.build(featureType, noValues, statementId);
+
+        // write the statement data to the fields
+        final Geometry geom = GeoParseUtils.getGeometry(statement, new GmlParser());
+        if(geom == null || geom.isEmpty() || !geom.isValid()) {
+            throw new ParseException("Could not create geometry for statement " + statement);
+        }
+        newFeature.setDefaultGeometry(geom);
+
+        newFeature.setAttribute(SUBJECT_ATTRIBUTE, subject);
+        newFeature.setAttribute(PREDICATE_ATTRIBUTE, predicate);
+        newFeature.setAttribute(OBJECT_ATTRIBUTE, object);
+        newFeature.setAttribute(CONTEXT_ATTRIBUTE, context);
+
+        // preserve the ID that we created for this feature
+        // (set the hint to FALSE to have GeoTools generate IDs)
+        newFeature.getUserData().put(Hints.USE_PROVIDED_FID, java.lang.Boolean.TRUE);
+
+        return newFeature;
+    }
+
+    private CloseableIteration<Statement, QueryEvaluationException> performQuery(final String type, final Geometry geometry,
+            final StatementConstraints contraints) {
+        final List<String> filterParms = new ArrayList<String>();
+
+        filterParms.add(type + "(" + GEOMETRY_ATTRIBUTE + ", " + geometry + " )");
+
+        if (contraints.hasSubject()) {
+            filterParms.add("( " + SUBJECT_ATTRIBUTE + "= '" + contraints.getSubject() + "') ");
+        }
+        if (contraints.hasContext()) {
+            filterParms.add("( " + CONTEXT_ATTRIBUTE + "= '" + contraints.getContext() + "') ");
+        }
+        if (contraints.hasPredicates()) {
+            final List<String> predicates = new ArrayList<String>();
+            for (final URI u : contraints.getPredicates()) {
+                predicates.add("( " + PREDICATE_ATTRIBUTE + "= '" + u.stringValue() + "') ");
+            }
+            filterParms.add("(" + StringUtils.join(predicates, " OR ") + ")");
+        }
+
+        final String filterString = StringUtils.join(filterParms, " AND ");
+        logger.info("Performing geomesa query : " + filterString);
+
+        return getIteratorWrapper(filterString);
+    }
+
+    private CloseableIteration<Statement, QueryEvaluationException> getIteratorWrapper(final String filterString) {
+
+        return new CloseableIteration<Statement, QueryEvaluationException>() {
+
+            private FeatureIterator<SimpleFeature> featureIterator = null;
+
+            FeatureIterator<SimpleFeature> getIterator() throws QueryEvaluationException {
+                if (featureIterator == null) {
+                    Filter cqlFilter;
+                    try {
+                        cqlFilter = ECQL.toFilter(filterString);
+                    } catch (final CQLException e) {
+                        logger.error("Error parsing query: " + filterString, e);
+                        throw new QueryEvaluationException(e);
+                    }
+
+                    final Query query = new Query(featureType.getTypeName(), cqlFilter);
+                    try {
+                        featureIterator = featureSource.getFeatures(query).features();
+                    } catch (final IOException e) {
+                        logger.error("Error performing query: " + filterString, e);
+                        throw new QueryEvaluationException(e);
+                    }
+                }
+                return featureIterator;
+            }
+
+            @Override
+            public boolean hasNext() throws QueryEvaluationException {
+                return getIterator().hasNext();
+            }
+
+            @Override
+            public Statement next() throws QueryEvaluationException {
+                final SimpleFeature feature = getIterator().next();
+                final String subjectString = feature.getAttribute(SUBJECT_ATTRIBUTE).toString();
+                final String predicateString = feature.getAttribute(PREDICATE_ATTRIBUTE).toString();
+                final String objectString = feature.getAttribute(OBJECT_ATTRIBUTE).toString();
+                final String contextString = feature.getAttribute(CONTEXT_ATTRIBUTE).toString();
+                final Statement statement = StatementSerializer.readStatement(subjectString, predicateString, objectString, contextString);
+                return statement;
+            }
+
+            @Override
+            public void remove() {
+                throw new UnsupportedOperationException("Remove not implemented");
+            }
+
+            @Override
+            public void close() throws QueryEvaluationException {
+                getIterator().close();
+            }
+        };
+    }
+
+    @Override
+    public CloseableIteration<Statement, QueryEvaluationException> queryEquals(final Geometry query, final StatementConstraints contraints) {
+        return performQuery("EQUALS", query, contraints);
+    }
+
+    @Override
+    public CloseableIteration<Statement, QueryEvaluationException> queryDisjoint(final Geometry query, final StatementConstraints contraints) {
+        return performQuery("DISJOINT", query, contraints);
+    }
+
+    @Override
+    public CloseableIteration<Statement, QueryEvaluationException> queryIntersects(final Geometry query, final StatementConstraints contraints) {
+        return performQuery("INTERSECTS", query, contraints);
+    }
+
+    @Override
+    public CloseableIteration<Statement, QueryEvaluationException> queryTouches(final Geometry query, final StatementConstraints contraints) {
+        return performQuery("TOUCHES", query, contraints);
+    }
+
+    @Override
+    public CloseableIteration<Statement, QueryEvaluationException> queryCrosses(final Geometry query, final StatementConstraints contraints) {
+        return performQuery("CROSSES", query, contraints);
+    }
+
+    @Override
+    public CloseableIteration<Statement, QueryEvaluationException> queryWithin(final Geometry query, final StatementConstraints contraints) {
+        return performQuery("WITHIN", query, contraints);
+    }
+
+    @Override
+    public CloseableIteration<Statement, QueryEvaluationException> queryContains(final Geometry query, final StatementConstraints contraints) {
+        return performQuery("CONTAINS", query, contraints);
+    }
+
+    @Override
+    public CloseableIteration<Statement, QueryEvaluationException> queryOverlaps(final Geometry query, final StatementConstraints contraints) {
+        return performQuery("OVERLAPS", query, contraints);
+    }
+    
+    @Override
+    public CloseableIteration<Statement, QueryEvaluationException> queryNear(final NearQuery query,
+            final StatementConstraints contraints) {
+        throw new UnsupportedOperationException("Near queries are not supported in Accumulo.");
+    }
+
+    @Override
+    public Set<URI> getIndexablePredicates() {
+        return validPredicates;
+    }
+
+    @Override
+    public void flush() throws IOException {
+        // TODO cache and flush features instead of writing them one at a time
+    }
+
+    @Override
+    public void close() throws IOException {
+        flush();
+    }
+
+
+    @Override
+    public String getTableName() {
+        return getTableName(conf);
+    }
+
+    /**
+     * Get the Accumulo table that will be used by this index.
+     * @param conf
+     * @return table name guaranteed to be used by instances of this index
+     */
+    public static String getTableName(final Configuration conf) {
+        return makeTableName( ConfigUtils.getTablePrefix(conf) );
+    }
+
+    /**
+     * Make the Accumulo table name used by this indexer for a specific instance of Rya.
+     *
+     * @param ryaInstanceName - The name of the Rya instance the table name is for. (not null)
+     * @return The Accumulo table name used by this indexer for a specific instance of Rya.
+     */
+    public static String makeTableName(final String ryaInstanceName) {
+        requireNonNull(ryaInstanceName);
+        return ryaInstanceName + TABLE_SUFFIX;
+    }
+
+    private void deleteStatements(final Collection<RyaStatement> ryaStatements) throws IOException {
+        // create a feature collection
+        final DefaultFeatureCollection featureCollection = new DefaultFeatureCollection();
+
+        for (final RyaStatement ryaStatement : ryaStatements) {
+            final Statement statement = RyaToRdfConversions.convertStatement(ryaStatement);
+            // if the predicate list is empty, accept all predicates.
+            // Otherwise, make sure the predicate is on the "valid" list
+            final boolean isValidPredicate = validPredicates.isEmpty() || validPredicates.contains(statement.getPredicate());
+
+            if (isValidPredicate && (statement.getObject() instanceof Literal)) {
+                try {
+                    final SimpleFeature feature = createFeature(featureType, statement);
+                    featureCollection.add(feature);
+                } catch (final ParseException e) {
+                    logger.warn("Error getting geo from statement: " + statement.toString(), e);
+                }
+            }
+        }
+
+        // remove this feature collection from the store
+        if (!featureCollection.isEmpty()) {
+            final Set<Identifier> featureIds = new HashSet<Identifier>();
+            final FilterFactory filterFactory = CommonFactoryFinder.getFilterFactory(null);
+            final Set<String> stringIds = DataUtilities.fidSet(featureCollection);
+            for (final String id : stringIds) {
+                featureIds.add(filterFactory.featureId(id));
+            }
+            final Filter filter = filterFactory.id(featureIds);
+            featureStore.removeFeatures(filter);
+        }
+    }
+
+
+    @Override
+    public void deleteStatement(final RyaStatement statement) throws IOException {
+        deleteStatements(Collections.singleton(statement));
+    }
+
+    @Override
+    public void init() {
+    }
+
+    @Override
+    public void setConnector(final Connector connector) {
+    }
+
+    @Override
+    public void destroy() {
+    }
+
+    @Override
+    public void purge(final RdfCloudTripleStoreConfiguration configuration) {
+    }
+
+    @Override
+    public void dropAndDestroy() {
+    }
+}