You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rya.apache.org by mi...@apache.org on 2017/06/14 18:50:13 UTC
[1/5] incubator-rya git commit: RYA-240 GeoTemporal framework
Repository: incubator-rya
Updated Branches:
refs/heads/master f8d02eb3d -> 646d21b4e
RYA-240 GeoTemporal framework
Interfaces and integration test for geotemporal indexing.
The Integration test is incomplete as the expected is
currently unknown.
Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/af5964aa
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/af5964aa
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/af5964aa
Branch: refs/heads/master
Commit: af5964aa72ebb3436666929a57a5ee7469732093
Parents: f8d02eb
Author: me@devbox.(none) <me@devbox.(none)>
Authored: Wed Jan 18 12:48:11 2017 -0500
Committer: Aaron Mihalik <mi...@alum.mit.edu>
Committed: Wed Jun 14 13:27:42 2017 -0400
----------------------------------------------------------------------
.../GeoTemporalExternalSetMatcherFactory.java | 44 +++
.../geotemporal/GeoTemporalIndexException.java | 57 ++++
.../GeoTemporalIndexSetProvider.java | 251 ++++++++++++++++
.../geotemporal/GeoTemporalIndexer.java | 142 ++++++++++
.../geotemporal/GeoTemporalIndexerFactory.java | 53 ++++
.../geotemporal/GeoTemporalOptimizer.java | 69 +++++
.../GeoTemporalToSegmentConverter.java | 51 ++++
.../rya/indexing/geotemporal/model/Event.java | 218 ++++++++++++++
.../geotemporal/model/EventQueryNode.java | 283 +++++++++++++++++++
.../geotemporal/storage/EventStorage.java | 130 +++++++++
10 files changed, 1298 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/af5964aa/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/GeoTemporalExternalSetMatcherFactory.java
----------------------------------------------------------------------
diff --git a/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/GeoTemporalExternalSetMatcherFactory.java b/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/GeoTemporalExternalSetMatcherFactory.java
new file mode 100644
index 0000000..c4a287e
--- /dev/null
+++ b/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/GeoTemporalExternalSetMatcherFactory.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.geotemporal;
+
+import org.apache.rya.indexing.external.matching.AbstractExternalSetMatcherFactory;
+import org.apache.rya.indexing.external.matching.ExternalSetMatcher;
+import org.apache.rya.indexing.external.matching.JoinSegment;
+import org.apache.rya.indexing.external.matching.JoinSegmentMatcher;
+import org.apache.rya.indexing.external.matching.OptionalJoinSegment;
+import org.apache.rya.indexing.external.matching.OptionalJoinSegmentMatcher;
+import org.apache.rya.indexing.geotemporal.model.EventQueryNode;
+
+/**
+ * Factory used to build {@link EntityQueryNodeMatcher}s for the {@link GeoTemporalIndexOptimizer}.
+ *
+ */
+public class GeoTemporalExternalSetMatcherFactory extends AbstractExternalSetMatcherFactory<EventQueryNode> {
+
+ @Override
+ protected ExternalSetMatcher<EventQueryNode> getJoinSegmentMatcher(final JoinSegment<EventQueryNode> segment) {
+ return new JoinSegmentMatcher<EventQueryNode>(segment, new GeoTemporalToSegmentConverter());
+ }
+
+ @Override
+ protected ExternalSetMatcher<EventQueryNode> getOptionalJoinSegmentMatcher(final OptionalJoinSegment<EventQueryNode> segment) {
+ return new OptionalJoinSegmentMatcher<EventQueryNode>(segment, new GeoTemporalToSegmentConverter());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/af5964aa/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/GeoTemporalIndexException.java
----------------------------------------------------------------------
diff --git a/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/GeoTemporalIndexException.java b/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/GeoTemporalIndexException.java
new file mode 100644
index 0000000..b2d4de5
--- /dev/null
+++ b/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/GeoTemporalIndexException.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.geotemporal;
+
+import org.apache.rya.indexing.entity.model.TypedEntity;
+
+/**
+ * An operation over the {@link TypedEntity} index failed to complete.
+ */
+public class GeoTemporalIndexException extends Exception {
+ private static final long serialVersionUID = 1L;
+
+ /**
+ * Constructs a new exception with the specified detail message. The
+ * cause is not initialized, and may subsequently be initialized by
+ * a call to {@link #initCause}.
+ *
+ * @param message the detail message. The detail message is saved for
+ * later retrieval by the {@link #getMessage()} method.
+ */
+ public GeoTemporalIndexException(final String message) {
+ super(message);
+ }
+
+ /**
+ * Constructs a new exception with the specified detail message and
+ * cause. <p>Note that the detail message associated with
+ * {@code cause} is <i>not</i> automatically incorporated in
+ * this exception's detail message.
+ *
+ * @param message the detail message (which is saved for later retrieval
+ * by the {@link #getMessage()} method).
+ * @param cause the cause (which is saved for later retrieval by the
+ * {@link #getCause()} method). (A <tt>null</tt> value is
+ * permitted, and indicates that the cause is nonexistent or
+ * unknown.)
+ */
+ public GeoTemporalIndexException(final String message, final Throwable cause) {
+ super(message, cause);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/af5964aa/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/GeoTemporalIndexSetProvider.java
----------------------------------------------------------------------
diff --git a/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/GeoTemporalIndexSetProvider.java b/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/GeoTemporalIndexSetProvider.java
new file mode 100644
index 0000000..38790c4
--- /dev/null
+++ b/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/GeoTemporalIndexSetProvider.java
@@ -0,0 +1,251 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.geotemporal;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import org.apache.rya.indexing.IndexingExpr;
+import org.apache.rya.indexing.IndexingFunctionRegistry;
+import org.apache.rya.indexing.IndexingFunctionRegistry.FUNCTION_TYPE;
+import org.apache.rya.indexing.accumulo.geo.GeoTupleSet;
+import org.apache.rya.indexing.external.matching.ExternalSetProvider;
+import org.apache.rya.indexing.external.matching.QuerySegment;
+import org.apache.rya.indexing.geotemporal.model.EventQueryNode;
+import org.apache.rya.indexing.geotemporal.storage.EventStorage;
+import org.openrdf.model.URI;
+import org.openrdf.model.Value;
+import org.openrdf.model.impl.URIImpl;
+import org.openrdf.query.algebra.FunctionCall;
+import org.openrdf.query.algebra.QueryModelNode;
+import org.openrdf.query.algebra.StatementPattern;
+import org.openrdf.query.algebra.ValueConstant;
+import org.openrdf.query.algebra.ValueExpr;
+import org.openrdf.query.algebra.Var;
+import org.openrdf.query.algebra.helpers.QueryModelVisitorBase;
+
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
+
+/**
+ * Provides {@link GeoTupleSet}s.
+ */
+public class GeoTemporalIndexSetProvider implements ExternalSetProvider<EventQueryNode> {
+ //organzied by object var. Each object is a filter, or set of filters
+ private Multimap<Var, IndexingExpr> filterMap;
+
+ //organzied by subject var. Each subject is a GeoTemporalTupleSet
+ private Multimap<Var, StatementPattern> patternMap;
+
+ //filters that have not been constrained by statement patterns into indexing expressions yet.
+ private Multimap<Var, FunctionCall> unmatchedFilters;
+ //filters that have been used, to be used by the matcher later.
+ private Multimap<Var, FunctionCall> matchedFilters;
+
+ //organzied by object var. Used to find matches between unmatch filters and patterns
+ private Map<Var, StatementPattern> objectPatterns;
+
+
+ private static URI filterURI;
+
+ private final EventStorage eventStorage;
+
+ public GeoTemporalIndexSetProvider(final EventStorage eventStorage) {
+ this.eventStorage = requireNonNull(eventStorage);
+ }
+
+ @Override
+ public List<EventQueryNode> getExternalSets(final QuerySegment<EventQueryNode> node) {
+ filterMap = HashMultimap.create();
+ patternMap = HashMultimap.create();
+ unmatchedFilters = HashMultimap.create();
+ matchedFilters = HashMultimap.create();
+
+ objectPatterns = new HashMap<>();
+ //discover entities
+ buildMaps(node);
+ final List<EventQueryNode> nodes = createNodes();
+
+ return nodes;
+ }
+
+ private List<EventQueryNode> createNodes() {
+ final List<EventQueryNode> nodes = new ArrayList<>();
+ for(final Var subj : patternMap.keySet()) {
+ final EventQueryNode node = getGeoTemporalNode(subj);
+ if(node != null) {
+ nodes.add(node);
+ }
+ }
+ return nodes;
+ }
+
+ private EventQueryNode getGeoTemporalNode(final Var subj) {
+ final Collection<StatementPattern> patterns = patternMap.get(subj);
+ final Collection<FunctionCall> usedFilters = new ArrayList<>();
+ Optional<StatementPattern> geoPattern = Optional.empty();
+ Optional<StatementPattern> temporalPattern = Optional.empty();
+ Optional<Collection<IndexingExpr>> geoFilters = Optional.empty();
+ Optional<Collection<IndexingExpr>> temporalFilters = Optional.empty();
+
+ //should only be 2 patterns.
+ for(final StatementPattern sp : patterns) {
+ final Var obj = sp.getObjectVar();
+
+ ///filter map does not have -const-
+
+
+ if(filterMap.containsKey(obj)) {
+ final Collection<IndexingExpr> filters = filterMap.get(obj);
+ final IndexingFunctionRegistry.FUNCTION_TYPE type = ensureSameType(filters);
+ if(type != null && type == FUNCTION_TYPE.GEO) {
+ geoPattern = Optional.of(sp);
+ geoFilters = Optional.of(filters);
+ usedFilters.addAll(matchedFilters.get(obj));
+ } else if(type != null && type == FUNCTION_TYPE.TEMPORAL) {
+ temporalPattern = Optional.of(sp);
+ temporalFilters = Optional.of(filters);
+ usedFilters.addAll(matchedFilters.get(obj));
+ } else {
+ return null;
+ }
+ } else {
+ return null;
+ }
+ }
+
+ if(geoFilters.isPresent() && temporalFilters.isPresent() && geoPattern.isPresent() && temporalPattern.isPresent()) {
+ return new EventQueryNode(eventStorage, geoPattern.get(), temporalPattern.get(), geoFilters.get(), temporalFilters.get(), usedFilters);
+ } else {
+ return null;
+ }
+ }
+
+ private FUNCTION_TYPE ensureSameType(final Collection<IndexingExpr> filters) {
+ FUNCTION_TYPE type = null;
+ for(final IndexingExpr filter : filters) {
+ if(type == null) {
+ type = IndexingFunctionRegistry.getFunctionType(filter.getFunction());
+ } else {
+ if(IndexingFunctionRegistry.getFunctionType(filter.getFunction()) != type) {
+ return null;
+ }
+ }
+ }
+ return type;
+ }
+
+ private void buildMaps(final QuerySegment<EventQueryNode> node) {
+ final List<QueryModelNode> unused = new ArrayList<>();
+ for (final QueryModelNode pattern : node.getOrderedNodes()) {
+ if(pattern instanceof FunctionCall) {
+ discoverFilter((FunctionCall) pattern, unused);
+ }
+ if(pattern instanceof StatementPattern) {
+ discoverPatterns((StatementPattern) pattern, unused);
+ }
+ }
+ }
+
+ private void discoverFilter(final FunctionCall filter, final List<QueryModelNode> unmatched) {
+ try {
+ filter.visit(new FilterVisitor());
+ } catch (final Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ private void discoverPatterns(final StatementPattern pattern, final List<QueryModelNode> unmatched) {
+ final Var subj = pattern.getSubjectVar();
+ final Var objVar = pattern.getObjectVar();
+
+ patternMap.put(subj, pattern);
+ objectPatterns.put(objVar, pattern);
+ //check for existing filters.
+ if(unmatchedFilters.containsKey(objVar)) {
+ final Collection<FunctionCall> calls = unmatchedFilters.removeAll(objVar);
+ for(final FunctionCall call : calls) {
+ addFilter(call);
+ matchedFilters.put(objVar, call);
+ }
+ }
+ }
+
+ @Override
+ public Iterator<List<EventQueryNode>> getExternalSetCombos(final QuerySegment<EventQueryNode> segment) {
+ final List<List<EventQueryNode>> comboList = new ArrayList<>();
+ comboList.add(getExternalSets(segment));
+ return comboList.iterator();
+ }
+
+ private void addFilter(final FunctionCall call) {
+ filterURI = new URIImpl(call.getURI());
+ final Var objVar = IndexingFunctionRegistry.getResultVarFromFunctionCall(filterURI, call.getArgs());
+ filterMap.put(objVar, new IndexingExpr(filterURI, objectPatterns.get(objVar), extractArguments(objVar.getName(), call)));
+ }
+
+ private Value[] extractArguments(final String matchName, final FunctionCall call) {
+ final Value args[] = new Value[call.getArgs().size() - 1];
+ int argI = 0;
+ for (int i = 0; i != call.getArgs().size(); ++i) {
+ final ValueExpr arg = call.getArgs().get(i);
+ if (argI == i && arg instanceof Var && matchName.equals(((Var)arg).getName())) {
+ continue;
+ }
+ if (arg instanceof ValueConstant) {
+ args[argI] = ((ValueConstant)arg).getValue();
+ } else if (arg instanceof Var && ((Var)arg).hasValue()) {
+ args[argI] = ((Var)arg).getValue();
+ } else {
+ throw new IllegalArgumentException("Query error: Found " + arg + ", expected a Literal, BNode or URI");
+ }
+ ++argI;
+ }
+ return args;
+ }
+
+ /**
+ * Finds the object/function in a Filter. If the associated statement pattern
+ * has been found, creates the {@link IndexingExpr} and adds it to the map.
+ */
+ private class FilterVisitor extends QueryModelVisitorBase<Exception> {
+ @Override
+ public void meet(final FunctionCall call) throws Exception {
+
+ filterURI = new URIImpl(call.getURI());
+ final FUNCTION_TYPE type = IndexingFunctionRegistry.getFunctionType(filterURI);
+ if(type == FUNCTION_TYPE.GEO || type == FUNCTION_TYPE.TEMPORAL) {
+ final Var objVar = IndexingFunctionRegistry.getResultVarFromFunctionCall(filterURI, call.getArgs());
+ if(objectPatterns.containsKey(objVar)) {
+ filterMap.put(objVar, new IndexingExpr(filterURI, objectPatterns.get(objVar), extractArguments(objVar.getName(), call)));
+ matchedFilters.put(objVar, call);
+ } else {
+ unmatchedFilters.put(objVar, call);
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/af5964aa/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/GeoTemporalIndexer.java
----------------------------------------------------------------------
diff --git a/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/GeoTemporalIndexer.java b/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/GeoTemporalIndexer.java
new file mode 100644
index 0000000..01b254b
--- /dev/null
+++ b/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/GeoTemporalIndexer.java
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.geotemporal;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.rya.api.persist.index.RyaSecondaryIndexer;
+import org.apache.rya.indexing.GeoConstants;
+import org.apache.rya.indexing.geotemporal.storage.EventStorage;
+import org.openrdf.model.Statement;
+import org.openrdf.model.URI;
+import org.openrdf.model.impl.URIImpl;
+
+/**
+ * A repository to store, index, and retrieve {@link Statement}s based on geotemporal features.
+ */
+public interface GeoTemporalIndexer extends RyaSecondaryIndexer {
+
+ /**
+ * Creates the {@link Eventtorage} that will be used by the indexer.
+ *
+ * @param conf - Indicates how the {@link EventStorage} is initialized. (not null)
+ * @return The {@link EventStorage} that will be used by this indexer.
+ */
+ public abstract EventStorage getEventStorage(final Configuration conf);
+
+ public enum GeoPolicy {
+ EQUALS(GeoConstants.GEO_SF_EQUALS),
+ DISJOINT(GeoConstants.GEO_SF_DISJOINT),
+ INTERSECTS(GeoConstants.GEO_SF_INTERSECTS),
+ TOUCHES(GeoConstants.GEO_SF_TOUCHES),
+ CROSSES(GeoConstants.GEO_SF_CROSSES),
+ WITHIN(GeoConstants.GEO_SF_WITHIN),
+ CONTAINS(GeoConstants.GEO_SF_CONTAINS),
+ OVERLAPS(GeoConstants.GEO_SF_OVERLAPS);
+
+ private final URI uri;
+
+ private GeoPolicy(final URI uri) {
+ this.uri = uri;
+ }
+
+ public URI getURI() {
+ return uri;
+ }
+
+ public static GeoPolicy fromURI(final URI uri) {
+ for(final GeoPolicy policy : GeoPolicy.values()) {
+ if(policy.getURI().equals(uri)) {
+ return policy;
+ }
+ }
+ return null;
+ }
+ }
+
+ String TEMPORAL_NS = "tag:rya-rdf.org,2015:temporal#";
+ /**
+ * All of the filter functions that can be used in a temporal based query.
+ * <p>
+ */
+ public enum TemporalPolicy {
+ /**
+ * The provided instant in time equals the instant the event took place.
+ */
+ INSTANT_EQUALS_INSTANT(true, new URIImpl(TEMPORAL_NS+"equals")),
+
+ /**
+ * The provided instant in time was before when the event took place.
+ */
+ INSTANT_BEFORE_INSTANT(true, new URIImpl(TEMPORAL_NS+"before")),
+
+ /**
+ * The provided instant in time was after when the event took place.
+ */
+ INSTANT_AFTER_INSTANT(true, new URIImpl(TEMPORAL_NS+"after")),
+
+ /**
+ * The provided instant in time was before a time period.
+ */
+ INSTANT_BEFORE_INTERVAL(false, new URIImpl(TEMPORAL_NS+"beforeInterval")),
+
+ /**
+ * The provided instant in time took place within a set of time.
+ */
+ INSTANT_IN_INTERVAL(false, new URIImpl(TEMPORAL_NS+"insideInterval")),
+
+ /**
+ * The provided instant in time took place after a time period.
+ */
+ INSTANT_AFTER_INTERVAL(false, new URIImpl(TEMPORAL_NS+"afterInterval")),
+
+ /**
+ * The provided instant in time equals the instant the event took place.
+ */
+ INSTANT_START_INTERVAL(false, new URIImpl(TEMPORAL_NS+"hasBeginningInterval")),
+ INSTANT_END_INTERVAL(false, new URIImpl(TEMPORAL_NS+"hasEndInterval")),
+ INTERVAL_EQUALS(false, new URIImpl(TEMPORAL_NS+"intervalEquals")),
+ INTERVAL_BEFORE(false, new URIImpl(TEMPORAL_NS+"intervalBefore")),
+ INTERVAL_AFTER(false, new URIImpl(TEMPORAL_NS+"intervalAfter"));
+
+ private final boolean isInstant;
+ private final URI uri;
+
+ TemporalPolicy(final boolean isInstant, final URI uri) {
+ this.isInstant = isInstant;
+ this.uri = uri;
+ }
+
+ public boolean isInstant(){
+ return isInstant;
+ }
+
+ public URI getURI() {
+ return uri;
+ }
+
+ public static TemporalPolicy fromURI(final URI uri) {
+ for(final TemporalPolicy policy : TemporalPolicy.values()) {
+ if(policy.getURI().equals(uri)) {
+ return policy;
+ }
+ }
+ return null;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/af5964aa/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/GeoTemporalIndexerFactory.java
----------------------------------------------------------------------
diff --git a/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/GeoTemporalIndexerFactory.java b/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/GeoTemporalIndexerFactory.java
new file mode 100644
index 0000000..a7ba8fa
--- /dev/null
+++ b/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/GeoTemporalIndexerFactory.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.geotemporal;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.rya.indexing.accumulo.ConfigUtils;
+import org.apache.rya.indexing.geotemporal.mongo.MongoGeoTemporalIndexer;
+import org.apache.rya.mongodb.MongoDBRdfConfiguration;
+import org.apache.rya.mongodb.MongoSecondaryIndex;
+
+/**
+ * Factory for retrieving a {@link GeoTemporalIndexer} based on a provided {@link Configuration}.
+ */
+public class GeoTemporalIndexerFactory {
+ /**
+ * Creates and returns a {@link GeoTemporalIndexer}.
+ * @param conf - The {@link Configuration} to base the {@link GeoTemporalIndexer} on.
+ * @return The created {@link GeoTemporalIndexer}.
+ */
+ public GeoTemporalIndexer getIndexer(final Configuration conf) {
+ if(ConfigUtils.getUseMongo(conf)) {
+ final MongoDBRdfConfiguration config = new MongoDBRdfConfiguration(conf);
+ for(final MongoSecondaryIndex index : config.getAdditionalIndexers()) {
+ if(index instanceof GeoTemporalIndexer) {
+ return (GeoTemporalIndexer) index;
+ }
+ }
+ final MongoGeoTemporalIndexer index = new MongoGeoTemporalIndexer();
+ index.setConf(conf);
+ index.init();
+ return index;
+ } else {
+ //TODO: add Accumulo here.
+ return null;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/af5964aa/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/GeoTemporalOptimizer.java
----------------------------------------------------------------------
diff --git a/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/GeoTemporalOptimizer.java b/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/GeoTemporalOptimizer.java
new file mode 100644
index 0000000..d626adc
--- /dev/null
+++ b/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/GeoTemporalOptimizer.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.geotemporal;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.rya.indexing.external.matching.AbstractExternalSetOptimizer;
+import org.apache.rya.indexing.external.matching.ExternalSetMatcher;
+import org.apache.rya.indexing.external.matching.ExternalSetProvider;
+import org.apache.rya.indexing.external.matching.QueryNodeListRater;
+import org.apache.rya.indexing.external.matching.QuerySegment;
+import org.apache.rya.indexing.geotemporal.model.EventQueryNode;
+
+import com.google.common.base.Optional;
+
+
+public class GeoTemporalOptimizer extends AbstractExternalSetOptimizer<EventQueryNode> implements Configurable {
+ private static final GeoTemporalExternalSetMatcherFactory MATCHER_FACTORY = new GeoTemporalExternalSetMatcherFactory();
+
+ private GeoTemporalIndexer indexer;
+ private GeoTemporalIndexSetProvider provider;
+ private Configuration conf;
+
+ @Override
+ public void setConf(final Configuration conf) {
+ this.conf = conf;
+ final GeoTemporalIndexerFactory factory = new GeoTemporalIndexerFactory();
+ indexer = factory.getIndexer(conf);
+
+ //conf here does not matter since EventStorage has already been set in the indexer.
+ provider = new GeoTemporalIndexSetProvider(indexer.getEventStorage(conf));
+ }
+
+ @Override
+ public Configuration getConf() {
+ return conf;
+ }
+
+ @Override
+ protected ExternalSetMatcher<EventQueryNode> getMatcher(final QuerySegment<EventQueryNode> segment) {
+ return MATCHER_FACTORY.getMatcher(segment);
+ }
+
+ @Override
+ protected ExternalSetProvider<EventQueryNode> getProvider() {
+ return provider;
+ }
+
+ @Override
+ protected Optional<QueryNodeListRater> getNodeListRater(final QuerySegment<EventQueryNode> segment) {
+ return null;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/af5964aa/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/GeoTemporalToSegmentConverter.java
----------------------------------------------------------------------
diff --git a/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/GeoTemporalToSegmentConverter.java b/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/GeoTemporalToSegmentConverter.java
new file mode 100644
index 0000000..22bfdb1
--- /dev/null
+++ b/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/GeoTemporalToSegmentConverter.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.geotemporal;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.rya.indexing.external.matching.ExternalSetConverter;
+import org.apache.rya.indexing.external.matching.JoinSegment;
+import org.apache.rya.indexing.external.matching.QuerySegment;
+import org.apache.rya.indexing.geotemporal.model.EventQueryNode;
+import org.openrdf.query.algebra.Filter;
+import org.openrdf.query.algebra.QueryModelNode;
+import org.openrdf.query.algebra.ValueExpr;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Implementation of {@link ExternalSetConverter} to convert {@link EventQueryNode}s
+ * to {@link QuerySegment}s.
+ *
+ */
+public class GeoTemporalToSegmentConverter implements ExternalSetConverter<EventQueryNode> {
+ @Override
+ public QuerySegment<EventQueryNode> setToSegment(final EventQueryNode set) {
+ Preconditions.checkNotNull(set);
+ final Set<QueryModelNode> matched = new HashSet<>(set.getPatterns());
+ matched.addAll(set.getFilters());
+ final List<QueryModelNode> unmatched = new ArrayList<>(set.getPatterns());
+ return new JoinSegment<EventQueryNode>(matched, unmatched, new HashMap<ValueExpr, Filter>());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/af5964aa/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/model/Event.java
----------------------------------------------------------------------
diff --git a/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/model/Event.java b/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/model/Event.java
new file mode 100644
index 0000000..4c50bfb
--- /dev/null
+++ b/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/model/Event.java
@@ -0,0 +1,218 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.geotemporal.model;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Objects;
+import java.util.Optional;
+
+import org.apache.rya.api.domain.RyaURI;
+import org.apache.rya.indexing.TemporalInstant;
+import org.apache.rya.indexing.TemporalInterval;
+import org.apache.rya.indexing.geotemporal.GeoTemporalIndexer;
+
+import com.vividsolutions.jts.geom.Geometry;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * Query object for a {@link GeoTemporalIndexer}.
+ * Defines a {@link Geometry}, either a {@link TemporalInstant} or
+ * {@link TemporalInterval}, and a triple Subject.
+ */
+public class Event {
+ private final Optional<Geometry> geometry;
+ private final Optional<TemporalInstant> instant;
+ private final Optional<TemporalInterval> interval;
+ private final RyaURI subject;
+
+ private final boolean isInstant;
+
+ /**
+ * Creates a new {@link Event} query object with a {@link TemporalInstant}.
+ * @param geo - The {@link Geometry} to use when querying.
+ * @param instant - The {@link TemporalInstant} to use when querying.
+ * @param subject - The Subject that both statements must have when querying.
+ */
+ private Event(final Geometry geo, final TemporalInstant instant, final RyaURI subject) {
+ this.subject = requireNonNull(subject);
+
+ //these fields are nullable since they are filled field by field.
+ this.instant = Optional.ofNullable(instant);
+ geometry = Optional.ofNullable(geo);
+ isInstant = true;
+ interval = Optional.empty();
+ }
+
+ /**
+ * Creates a new {@link Event} query object with a {@link TemporalInterval}.
+ * @param geo - The {@link Geometry} to use when querying.
+ * @param interval - The {@link TemporalInterval} to use when querying.
+ * @param subject - The Subject that both statements must have when querying.
+ */
+ private Event(final Geometry geo, final TemporalInterval interval, final RyaURI subject) {
+ this.subject = requireNonNull(subject);
+
+ //these fields are nullable since they are filled field by field.
+ this.interval = Optional.ofNullable(interval);
+ geometry = Optional.ofNullable(geo);
+ isInstant = false;
+ instant = Optional.empty();
+ }
+
+ /**
+ * @return Whether or not the query object uses a {@link TemporalInstant}.
+ */
+ public boolean isInstant() {
+ return isInstant;
+ }
+
+ /**
+ * @return The {@link Geometry} to use when querying.
+ */
+ public Optional<Geometry> getGeometry() {
+ return geometry;
+ }
+
+ /**
+ * @return The {@link TemporalInstant} to use when querying.
+ */
+ public Optional<TemporalInstant> getInstant() {
+ return instant;
+ }
+
+ /**
+ * @return The {@link TemporalInterval} to use when querying.
+ */
+ public Optional<TemporalInterval> getInterval() {
+ return interval;
+ }
+
+ /**
+ * @return The statement subject.
+ */
+ public RyaURI getSubject() {
+ return subject;
+ }
+
+ @Override
+ public int hashCode() {
+ if(isInstant) {
+ return Objects.hash(subject, geometry, instant);
+ } else {
+ return Objects.hash(subject, geometry, interval);
+ }
+ }
+
+ @Override
+ public boolean equals(final Object o) {
+ if(this == o) {
+ return true;
+ }
+ if(o instanceof Event) {
+ final Event event = (Event) o;
+ return Objects.equals(subject, event.subject) &&
+ Objects.equals(isInstant, event.isInstant) &&
+ (isInstant ? Objects.equals(instant, event.instant) : Objects.equals(interval, event.interval));
+ }
+ return false;
+ }
+
+ public static Builder builder(final Event event) {
+ final Builder builder = new Builder()
+ .setSubject(event.getSubject());
+ if(event.getGeometry().isPresent()) {
+ builder.setGeometry(event.getGeometry().get());
+ }
+ if(event.isInstant()) {
+ if(event.getInstant().isPresent()) {
+ builder.setTemporalInstant(event.getInstant().get());
+ }
+ } else {
+ if(event.getInterval().isPresent()) {
+ builder.setTemporalInterval(event.getInterval().get());
+ }
+ }
+ return builder;
+ }
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ /**
+ * Builds instances of {@link Event}.
+ */
+ @DefaultAnnotation(NonNull.class)
+ public static class Builder {
+ private RyaURI subject;
+ private Geometry geo;
+ private TemporalInstant instant;
+ private TemporalInterval interval;
+
+ /**
+ * Sets the {@link RyaURI} subject.
+ * @param subject - The subject to key on the event.
+ */
+ public Builder setSubject(final RyaURI subject) {
+ this.subject = subject;
+ return this;
+ }
+
+ /**
+ * Sets the {@link Geometry}.
+ * @param geo - The geometry.
+ */
+ public Builder setGeometry(final Geometry geo) {
+ this.geo = geo;
+ return this;
+ }
+
+ /**
+ * Sets the {@link TemporalInterval}.
+ * @param interval - The interval.
+ */
+ public Builder setTemporalInterval(final TemporalInterval interval) {
+ this.interval = interval;
+ return this;
+ }
+
+ /**
+ * Sets the {@link TemporalInstant}.
+ * @param instant - The instant.
+ */
+ public Builder setTemporalInstant(final TemporalInstant instant) {
+ this.instant = instant;
+ return this;
+ }
+
+ /**
+ * @return The new {@link Event}.
+ */
+ public Event build() {
+ if(instant == null) {
+ return new Event(geo, interval, subject);
+ } else {
+ return new Event(geo, instant, subject);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/af5964aa/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/model/EventQueryNode.java
----------------------------------------------------------------------
diff --git a/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/model/EventQueryNode.java b/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/model/EventQueryNode.java
new file mode 100644
index 0000000..6953714
--- /dev/null
+++ b/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/model/EventQueryNode.java
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.geotemporal.model;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+
+import org.apache.rya.api.domain.RyaURI;
+import org.apache.rya.indexing.IndexingExpr;
+import org.apache.rya.indexing.TemporalInstantRfc3339;
+import org.apache.rya.indexing.geotemporal.storage.EventStorage;
+import org.apache.rya.indexing.mongodb.update.RyaObjectStorage.ObjectStorageException;
+import org.apache.rya.rdftriplestore.evaluation.ExternalBatchingIterator;
+import org.openrdf.model.Value;
+import org.openrdf.model.impl.ValueFactoryImpl;
+import org.openrdf.query.BindingSet;
+import org.openrdf.query.QueryEvaluationException;
+import org.openrdf.query.algebra.FunctionCall;
+import org.openrdf.query.algebra.StatementPattern;
+import org.openrdf.query.algebra.Var;
+import org.openrdf.query.algebra.evaluation.impl.ExternalSet;
+import org.openrdf.query.algebra.evaluation.iterator.CollectionIteration;
+import org.openrdf.query.impl.MapBindingSet;
+
+import com.vividsolutions.jts.geom.Geometry;
+
+import info.aduna.iteration.CloseableIteration;
+
+public class EventQueryNode extends ExternalSet implements ExternalBatchingIterator {
+ private final Collection<FunctionCall> usedFilters;
+ private final Collection<IndexingExpr> geoFilters;
+ private final Collection<IndexingExpr> temporalFilters;
+
+ private final StatementPattern geoPattern;
+ private final StatementPattern temporalPattern;
+
+ //Information about the subject of the patterns.
+ private final boolean subjectIsConstant;
+ private final Optional<String> subjectConstant;
+ private final Optional<String> subjectVar;
+
+ //since and EventQueryNode exists in a single segment, all binding names are garunteed to be assured.
+ private final Set<String> bindingNames;
+
+ private Collection<StatementPattern> patterns;
+
+ private final EventStorage eventStore;
+
+ /**
+ * Constructs an instance of {@link EventQueryNode}.
+ * @param usedFilters
+ *
+ * @param type - The type of {@link Event} this node matches. (not null)
+ * @param patterns - The query StatementPatterns that are solved using an
+ * Event of the Type. (not null)
+ * @param entities - The {@link EventStorage} that will be searched to match
+ * {@link BindingSet}s when evaluating a query. (not null)
+ */
+ public EventQueryNode(final EventStorage eventStore, final StatementPattern geoPattern, final StatementPattern temporalPattern, final Collection<IndexingExpr> geoFilters, final Collection<IndexingExpr> temporalFilters, final Collection<FunctionCall> usedFilters) throws IllegalStateException {
+ this.geoPattern = requireNonNull(geoPattern);
+ this.temporalPattern = requireNonNull(temporalPattern);
+ this.geoFilters = requireNonNull(geoFilters);
+ this.temporalFilters = requireNonNull(temporalFilters);
+ this.eventStore = requireNonNull(eventStore);
+ this.usedFilters = requireNonNull(usedFilters);
+ bindingNames = new HashSet<>();
+
+ // Subject based preconditions.
+ verifySameSubjects(getPatterns());
+ // Predicate based preconditions.
+ verifyAllPredicatesAreConstants(getPatterns());
+
+ // The Subject may either be constant or a variable.
+ final Var subject = patterns.iterator().next().getSubjectVar();
+ subjectIsConstant = subject.isConstant();
+ if(subjectIsConstant) {
+ subjectConstant = Optional.of( subject.getValue().toString() );
+ subjectVar = Optional.empty();
+ } else {
+ subjectConstant = Optional.empty();
+ subjectVar = Optional.of( subject.getName() );
+ }
+ }
+
+ @Override
+ public Set<String> getBindingNames() {
+ return bindingNames;
+ }
+
+ @Override
+ public Set<String> getAssuredBindingNames() {
+ return bindingNames;
+ }
+
+ /**
+ * Verify the Subject for all of the patterns is the same.
+ *
+ * @param patterns - The patterns to check.
+ * @throws IllegalStateException If all of the Subjects are not the same.
+ */
+ private static void verifySameSubjects(final Collection<StatementPattern> patterns) throws IllegalStateException {
+ requireNonNull(patterns);
+
+ final Iterator<StatementPattern> it = patterns.iterator();
+ final Var subject = it.next().getSubjectVar();
+
+ while(it.hasNext()) {
+ final StatementPattern pattern = it.next();
+ if(!pattern.getSubjectVar().equals(subject)) {
+ throw new IllegalStateException("At least one of the patterns has a different subject from the others. " +
+ "All subjects must be the same.");
+ }
+ }
+ }
+
+ /**
+ * Verifies all of the Statement Patterns have Constants for their predicates.
+ *
+ * @param patterns - The patterns to check. (not null)
+ * @throws IllegalStateException A pattern has a variable predicate.
+ */
+ private static void verifyAllPredicatesAreConstants(final Collection<StatementPattern> patterns) throws IllegalStateException {
+ requireNonNull(patterns);
+
+ for(final StatementPattern pattern : patterns) {
+ if(!pattern.getPredicateVar().isConstant()) {
+ throw new IllegalStateException("The Predicate of a Statement Pattern must be constant. Pattern: " + pattern);
+ }
+ }
+ }
+
+ @Override
+ public CloseableIteration<BindingSet, QueryEvaluationException> evaluate(final BindingSet bindings) throws QueryEvaluationException {
+ final List<BindingSet> list = new ArrayList<>();
+ try {
+ final Collection<Event> searchEvents;
+ final String subj;
+ // If the subject needs to be filled in, check if the subject variable is in the binding set.
+ if(subjectIsConstant) {
+ // if it is, fetch that value and then fetch the entity for the subject.
+ subj = subjectConstant.get();
+ searchEvents = eventStore.search(Optional.of(new RyaURI(subj)), Optional.of(geoFilters), Optional.of(temporalFilters));
+ } else {
+ searchEvents = eventStore.search(Optional.empty(), Optional.of(geoFilters), Optional.of(temporalFilters));
+ }
+
+ for(final Event event : searchEvents) {
+ final MapBindingSet resultSet = new MapBindingSet();
+ if(event.getGeometry().isPresent()) {
+ final Geometry geo = event.getGeometry().get();
+ final Value geoValue = ValueFactoryImpl.getInstance().createLiteral(geo.toText());
+ final Var geoObj = geoPattern.getObjectVar();
+ resultSet.addBinding(geoObj.getName(), geoValue);
+ }
+
+ final Value temporalValue;
+ if(event.isInstant() && event.getInstant().isPresent()) {
+ temporalValue = ValueFactoryImpl.getInstance().createLiteral(event.getInstant().get().getAsDateTime().toString(TemporalInstantRfc3339.FORMATTER));
+ } else if(event.getInterval().isPresent()) {
+ temporalValue = ValueFactoryImpl.getInstance().createLiteral(event.getInterval().get().getAsPair());
+ } else {
+ temporalValue = null;
+ }
+
+ if(temporalValue != null) {
+ final Var temporalObj = temporalPattern.getObjectVar();
+ resultSet.addBinding(temporalObj.getName(), temporalValue);
+ }
+ list.add(resultSet);
+ }
+ } catch (final ObjectStorageException e) {
+ throw new QueryEvaluationException("Failed to evaluate the binding set", e);
+ }
+ if(bindings.size() != 0) {
+ list.add(bindings);
+ }
+ return new CollectionIteration<>(list);
+ }
+
+ public Collection<IndexingExpr> getGeoFilters() {
+ return geoFilters;
+ }
+
+ public Collection<IndexingExpr> getTemporalFilters() {
+ return temporalFilters;
+ }
+
+ public Collection<FunctionCall> getFilters() {
+ return usedFilters;
+ }
+
+ public Collection<StatementPattern> getPatterns() {
+ if(patterns == null) {
+ patterns = new ArrayList<>();
+ patterns.add(geoPattern);
+ patterns.add(temporalPattern);
+ }
+ return patterns;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(subjectIsConstant,
+ subjectVar,
+ geoFilters,
+ temporalFilters,
+ geoPattern,
+ temporalPattern,
+ bindingNames,
+ eventStore);
+ }
+
+ @Override
+ public boolean equals(final Object other) {
+ if(other instanceof EventQueryNode) {
+ final EventQueryNode otherNode = (EventQueryNode)other;
+
+ return Objects.equals(subjectIsConstant, otherNode.subjectIsConstant) &&
+ Objects.equals(subjectVar, otherNode.subjectVar) &&
+ Objects.equals(geoFilters, otherNode.geoFilters) &&
+ Objects.equals(geoPattern, otherNode.geoPattern) &&
+ Objects.equals(temporalFilters, otherNode.temporalFilters) &&
+ Objects.equals(temporalPattern, otherNode.temporalPattern) &&
+ Objects.equals(bindingNames, otherNode.bindingNames) &&
+ Objects.equals(subjectConstant, otherNode.subjectConstant);
+ }
+ return false;
+ }
+
+ @Override
+ public EventQueryNode clone() {
+ return new EventQueryNode(eventStore, geoPattern, temporalPattern, geoFilters, temporalFilters, usedFilters);
+ }
+
+ @Override
+ public String toString() {
+ final StringBuilder sb = new StringBuilder();
+ sb.append("Geo Pattern: " + geoPattern.toString());
+ sb.append("\n--Geo Filters--\n");
+ for(final IndexingExpr filter : geoFilters) {
+ sb.append(filter.toString());
+ sb.append("\n");
+ }
+ sb.append("\n-------------------\n");
+ sb.append("Temporal Pattern: " + temporalPattern.toString());
+ sb.append("\n--Temporal Filters--\n");
+ for(final IndexingExpr filter : temporalFilters) {
+ sb.append(filter.toString());
+ sb.append("\n");
+ }
+ return sb.toString();
+ }
+
+ @Override
+ public CloseableIteration<BindingSet, QueryEvaluationException> evaluate(final Collection<BindingSet> bindingset)
+ throws QueryEvaluationException {
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/af5964aa/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/storage/EventStorage.java
----------------------------------------------------------------------
diff --git a/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/storage/EventStorage.java b/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/storage/EventStorage.java
new file mode 100644
index 0000000..47c18a0
--- /dev/null
+++ b/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/storage/EventStorage.java
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.geotemporal.storage;
+
+import java.util.Collection;
+import java.util.Optional;
+
+import org.apache.rya.api.domain.RyaURI;
+import org.apache.rya.indexing.IndexingExpr;
+import org.apache.rya.indexing.geotemporal.GeoTemporalIndexer;
+import org.apache.rya.indexing.geotemporal.model.Event;
+import org.apache.rya.indexing.mongodb.update.RyaObjectStorage;
+
+public interface EventStorage extends RyaObjectStorage<Event> {
+ /**
+ * Search for {@link Event}s from the storage by its subject.
+ * Will query based on present parameters.
+ *
+ * @param subject - The subject key to find events.
+ * @param geoFilters - The geo filters to find Events.
+ * @param temporalFilters - The temporal filters to find Events.
+ * @return The {@link Event}, if one exists for the subject.
+ * @throws ObjectStorageException A problem occurred while fetching the Entity from the storage.
+ */
+ public Collection<Event> search(final Optional<RyaURI> subject, Optional<Collection<IndexingExpr>> geoFilters, Optional<Collection<IndexingExpr>> temporalFilters) throws ObjectStorageException;
+
+ /**
+ * Indicates a problem while interacting with an {@link EventStorage}.
+ */
+ public static class EventStorageException extends ObjectStorageException {
+ private static final long serialVersionUID = 1L;
+
+ /**
+ * Constructs a new exception with the specified detail message. The
+ * cause is not initialized, and may subsequently be initialized by
+ * a call to {@link #initCause}.
+ *
+ * @param message the detail message. The detail message is saved for
+ * later retrieval by the {@link #getMessage()} method.
+ */
+ public EventStorageException(final String message) {
+ super(message);
+ }
+
+ /**
+ * Constructs a new exception with the specified detail message and
+ * cause. <p>Note that the detail message associated with
+ * {@code cause} is <i>not</i> automatically incorporated in
+ * this exception's detail message.
+ *
+ * @param message the detail message (which is saved for later retrieval
+ * by the {@link #getMessage()} method).
+ * @param cause the cause (which is saved for later retrieval by the
+ * {@link #getCause()} method). (A <tt>null</tt> value is
+ * permitted, and indicates that the cause is nonexistent or
+ * unknown.)
+ */
+ public EventStorageException(final String message, final Throwable cause) {
+ super(message, cause);
+ }
+ }
+
+ /**
+ * An {@link Event} could not be created because one already exists for the Subject.
+ */
+ public static class EventAlreadyExistsException extends EventStorageException {
+ private static final long serialVersionUID = 1L;
+
+ public EventAlreadyExistsException(final String message) {
+ super(message);
+ }
+
+ public EventAlreadyExistsException(final String message, final Throwable cause) {
+ super(message, cause);
+ }
+ }
+
+ /**
+ * An {@link TypedEvent} could not be updated because the old state does not
+ * match the current state.
+ */
+ public static class StaleUpdateException extends EventStorageException {
+ private static final long serialVersionUID = 1L;
+
+ public StaleUpdateException(final String message) {
+ super(message);
+ }
+
+ public StaleUpdateException(final String message, final Throwable cause) {
+ super(message, cause);
+ }
+ }
+
+ /**
+ * A {@link EventFilter} is a translation from an {@link IndexingExpr}
+ * to a format the {@link GeoTemporalIndexer} can use to easily determine which
+ * filter function is being used.
+ *
+ * @param T - The type of
+ */
+ interface EventFilter<T> {
+ /**
+ * Gets the translated query friendly form of the filter.
+ */
+ public T getQueryObject();
+ }
+
+ /**
+ * Factory for getting the {@link EventFilter} from an {@link IndexingExpr}.
+ */
+ interface EventFilterFactory<T> {
+ public EventFilter<T> getSearchFunction(final IndexingExpr filter);
+ }
+}
[2/5] incubator-rya git commit: RYA-237 Mongo GeoTemporal Indexer
Posted by mi...@apache.org.
RYA-237 Mongo GeoTemporal Indexer
Mongo support for the GeoTemporalIndexer
Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/63095d45
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/63095d45
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/63095d45
Branch: refs/heads/master
Commit: 63095d45a357da622343b42ed5ce22fd8217cc80
Parents: af5964a
Author: isper3at <sm...@gmail.com>
Authored: Wed Jan 18 15:38:11 2017 -0500
Committer: Aaron Mihalik <mi...@alum.mit.edu>
Committed: Wed Jun 14 13:27:56 2017 -0400
----------------------------------------------------------------------
.../mongo/EventDocumentConverter.java | 147 ++++++++++
.../geotemporal/mongo/EventUpdater.java | 85 ++++++
.../GeoTemporalMongoDBStorageStrategy.java | 293 +++++++++++++++++++
.../geotemporal/mongo/MongoEventStorage.java | 196 +++++++++++++
.../mongo/MongoGeoTemporalIndexer.java | 222 ++++++++++++++
5 files changed, 943 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63095d45/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/mongo/EventDocumentConverter.java
----------------------------------------------------------------------
diff --git a/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/mongo/EventDocumentConverter.java b/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/mongo/EventDocumentConverter.java
new file mode 100644
index 0000000..a41428e
--- /dev/null
+++ b/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/mongo/EventDocumentConverter.java
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.geotemporal.mongo;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Date;
+import java.util.List;
+
+import org.apache.rya.api.domain.RyaURI;
+import org.apache.rya.indexing.TemporalInstant;
+import org.apache.rya.indexing.TemporalInstantRfc3339;
+import org.apache.rya.indexing.TemporalInterval;
+import org.apache.rya.indexing.entity.storage.mongo.DocumentConverter;
+import org.apache.rya.indexing.geotemporal.model.Event;
+import org.apache.rya.indexing.mongodb.geo.GeoMongoDBStorageStrategy;
+import org.bson.Document;
+import org.bson.types.BasicBSONList;
+import org.joda.time.DateTime;
+
+import com.vividsolutions.jts.geom.Coordinate;
+import com.vividsolutions.jts.geom.Geometry;
+import com.vividsolutions.jts.geom.GeometryFactory;
+
+public class EventDocumentConverter implements DocumentConverter<Event>{
+ public static final String SUBJECT = "_id";
+ public static final String GEO_KEY = "location";
+ public static final String INTERVAL_START = "start";
+ public static final String INTERVAL_END = "end";
+ public static final String INSTANT = "instant";
+
+ private final GeoMongoDBStorageStrategy geoAdapter = new GeoMongoDBStorageStrategy(0);
+
+ @Override
+ public Document toDocument(final Event event) {
+ requireNonNull(event);
+
+ final Document doc = new Document();
+ doc.append(SUBJECT, event.getSubject().getData());
+
+ if(event.getGeometry().isPresent()) {
+ final BasicBSONList points = new BasicBSONList();
+ for(final double[] point : geoAdapter.getCorrespondingPoints(event.getGeometry().get())) {
+ final BasicBSONList pointDoc = new BasicBSONList();
+ for(final double p : point) {
+ pointDoc.add(p);
+ }
+ points.add(pointDoc);
+ }
+
+ doc.append(GEO_KEY, points);
+ }
+ if(event.isInstant()) {
+ if(event.getInstant().isPresent()) {
+ doc.append(INSTANT, event.getInstant().get().getAsDateTime().toDate());
+ }
+ } else {
+ if(event.getInterval().isPresent()) {
+ doc.append(INTERVAL_START, event.getInterval().get().getHasBeginning().getAsDateTime().toDate());
+ doc.append(INTERVAL_END, event.getInterval().get().getHasEnd().getAsDateTime().toDate());
+ }
+ }
+
+ return doc;
+ }
+
+ @Override
+ public Event fromDocument(final Document document) throws DocumentConverterException {
+ requireNonNull(document);
+
+ final boolean isInstant;
+
+ // Preconditions.
+ if(!document.containsKey(SUBJECT)) {
+ throw new DocumentConverterException("Could not convert document '" + document +
+ "' because its '" + SUBJECT + "' field is missing.");
+ }
+
+ if(document.containsKey(INSTANT)) {
+ isInstant = true;
+ } else {
+ isInstant = false;
+ }
+
+ final String subject = document.getString(SUBJECT);
+
+ final Event.Builder builder = new Event.Builder()
+ .setSubject(new RyaURI(subject));
+
+ if(document.containsKey(GEO_KEY)) {
+ final List<List<Double>> pointsList = (List<List<Double>>) document.get(GEO_KEY);
+ final Coordinate[] coords = new Coordinate[pointsList.size()];
+
+ int ii = 0;
+ for(final List<Double> point : pointsList) {
+ coords[ii] = new Coordinate(point.get(0), point.get(1));
+ ii++;
+ }
+
+ final GeometryFactory geoFact = new GeometryFactory();
+ final Geometry geo;
+ if(coords.length == 1) {
+ geo = geoFact.createPoint(coords[0]);
+ } else {
+ geo = geoFact.createPolygon(coords);
+ }
+ builder.setGeometry(geo);
+ }
+
+ if(isInstant) {
+ //we already know the key exists
+ final Date date = (Date) document.get(INSTANT);
+ final DateTime dt = new DateTime(date.getTime());
+ final TemporalInstant instant = new TemporalInstantRfc3339(dt);
+ builder.setTemporalInstant(instant);
+ } else if(document.containsKey(INTERVAL_START)){
+ Date date = (Date) document.get(INTERVAL_START);
+ DateTime dt = new DateTime(date.getTime());
+ final TemporalInstant begining = new TemporalInstantRfc3339(dt);
+
+ date = (Date) document.get(INTERVAL_END);
+ dt = new DateTime(date.getTime());
+ final TemporalInstant end = new TemporalInstantRfc3339(dt);
+
+ final TemporalInterval interval = new TemporalInterval(begining, end);
+ builder.setTemporalInterval(interval);
+ }
+ return builder.build();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63095d45/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/mongo/EventUpdater.java
----------------------------------------------------------------------
diff --git a/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/mongo/EventUpdater.java b/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/mongo/EventUpdater.java
new file mode 100644
index 0000000..c9f4658
--- /dev/null
+++ b/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/mongo/EventUpdater.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.geotemporal.mongo;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Optional;
+
+import org.apache.rya.api.domain.RyaURI;
+import org.apache.rya.indexing.geotemporal.model.Event;
+import org.apache.rya.indexing.geotemporal.storage.EventStorage;
+import org.apache.rya.indexing.geotemporal.storage.EventStorage.EventStorageException;
+import org.apache.rya.indexing.mongodb.update.DocumentUpdater;
+import org.apache.rya.indexing.mongodb.update.RyaObjectStorage.ObjectStorageException;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * Performs update operations over an {@link EventStorage}.
+ */
+@DefaultAnnotation(NonNull.class)
+public class EventUpdater implements DocumentUpdater<RyaURI, Event>{
+ private final EventStorage events;
+
+ /**
+ * Constructs an instance of {@link EventUpdater}
+ *
+ * @param events - The storage this updater operates over. (not null)
+ */
+ public EventUpdater(final EventStorage events) {
+ this.events = requireNonNull(events);
+ }
+
+ @Override
+ public Optional<Event> getOld(final RyaURI key) throws EventStorageException {
+ try {
+ return events.get(key);
+ } catch (final ObjectStorageException e) {
+ throw new EventStorageException(e.getMessage(), e);
+ }
+ }
+
+ @Override
+ public void create(final Event newObj) throws EventStorageException {
+ try {
+ events.create(newObj);
+ } catch (final ObjectStorageException e) {
+ throw new EventStorageException(e.getMessage(), e);
+ }
+ }
+
+ @Override
+ public void update(final Event old, final Event updated) throws EventStorageException {
+ try {
+ events.update(old, updated);
+ } catch (final ObjectStorageException e) {
+ throw new EventStorageException(e.getMessage(), e);
+ }
+ }
+
+ public void delete(final Event event) throws EventStorageException {
+ try {
+ events.delete(event.getSubject());
+ } catch (final ObjectStorageException e) {
+ throw new EventStorageException(e.getMessage(), e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63095d45/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/mongo/GeoTemporalMongoDBStorageStrategy.java
----------------------------------------------------------------------
diff --git a/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/mongo/GeoTemporalMongoDBStorageStrategy.java b/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/mongo/GeoTemporalMongoDBStorageStrategy.java
new file mode 100644
index 0000000..352dcb6
--- /dev/null
+++ b/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/mongo/GeoTemporalMongoDBStorageStrategy.java
@@ -0,0 +1,293 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.geotemporal.mongo;
+
+import static org.apache.rya.indexing.mongodb.geo.GeoMongoDBStorageStrategy.GeoQueryType.EQUALS;
+import static org.apache.rya.indexing.mongodb.geo.GeoMongoDBStorageStrategy.GeoQueryType.INTERSECTS;
+import static org.apache.rya.indexing.mongodb.geo.GeoMongoDBStorageStrategy.GeoQueryType.WITHIN;
+import static org.apache.rya.indexing.mongodb.temporal.TemporalMongoDBStorageStrategy.INSTANT;
+import static org.apache.rya.indexing.mongodb.temporal.TemporalMongoDBStorageStrategy.INTERVAL_END;
+import static org.apache.rya.indexing.mongodb.temporal.TemporalMongoDBStorageStrategy.INTERVAL_START;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.regex.Matcher;
+
+import org.apache.log4j.Logger;
+import org.apache.rya.api.domain.RyaStatement;
+import org.apache.rya.api.resolver.RyaToRdfConversions;
+import org.apache.rya.indexing.GeoConstants;
+import org.apache.rya.indexing.IndexingExpr;
+import org.apache.rya.indexing.TemporalInstant;
+import org.apache.rya.indexing.TemporalInstantRfc3339;
+import org.apache.rya.indexing.TemporalInterval;
+import org.apache.rya.indexing.accumulo.geo.GeoParseUtils;
+import org.apache.rya.indexing.geotemporal.GeoTemporalIndexException;
+import org.apache.rya.indexing.geotemporal.GeoTemporalIndexer.GeoPolicy;
+import org.apache.rya.indexing.geotemporal.GeoTemporalIndexer.TemporalPolicy;
+import org.apache.rya.indexing.mongodb.IndexingMongoDBStorageStrategy;
+import org.apache.rya.indexing.mongodb.geo.GeoMongoDBStorageStrategy;
+import org.apache.rya.indexing.mongodb.geo.GeoMongoDBStorageStrategy.GeoQuery;
+import org.apache.rya.indexing.mongodb.temporal.TemporalMongoDBStorageStrategy;
+import org.joda.time.DateTime;
+import org.openrdf.model.Statement;
+import org.openrdf.model.URI;
+import org.openrdf.query.MalformedQueryException;
+
+import com.mongodb.BasicDBObject;
+import com.mongodb.BasicDBObjectBuilder;
+import com.mongodb.DBCollection;
+import com.mongodb.DBObject;
+import com.mongodb.QueryBuilder;
+import com.vividsolutions.jts.geom.Geometry;
+import com.vividsolutions.jts.io.ParseException;
+import com.vividsolutions.jts.io.WKTReader;
+
+import jline.internal.Log;
+
+/**
+ * TODO: doc
+ */
+public class GeoTemporalMongoDBStorageStrategy extends IndexingMongoDBStorageStrategy {
+ private static final Logger LOG = Logger.getLogger(GeoTemporalMongoDBStorageStrategy.class);
+ private static final String GEO_KEY = "location";
+ private static final String TIME_KEY = "time";
+ private final TemporalMongoDBStorageStrategy temporalStrategy;
+ private final GeoMongoDBStorageStrategy geoStrategy;
+
+ public GeoTemporalMongoDBStorageStrategy() {
+ geoStrategy = new GeoMongoDBStorageStrategy(0);
+ temporalStrategy = new TemporalMongoDBStorageStrategy();
+ }
+
+ @Override
+ public void createIndices(final DBCollection coll){
+ coll.createIndex(GEO_KEY);
+ coll.createIndex(TIME_KEY);
+ }
+
+ public DBObject getFilterQuery(final Collection<IndexingExpr> geoFilters, final Collection<IndexingExpr> temporalFilters) throws GeoTemporalIndexException {
+ final QueryBuilder builder = QueryBuilder.start();
+
+ if(!geoFilters.isEmpty()) {
+ final DBObject[] geo = getGeoObjs(geoFilters);
+ if(!temporalFilters.isEmpty()) {
+ final DBObject[] temporal = getTemporalObjs(temporalFilters);
+ builder.and(oneOrAnd(geo), oneOrAnd(temporal));
+ return builder.get();
+ } else {
+ return oneOrAnd(geo);
+ }
+ } else if(!temporalFilters.isEmpty()) {
+ final DBObject[] temporal = getTemporalObjs(temporalFilters);
+ return oneOrAnd(temporal);
+ } else {
+ return builder.get();
+ }
+ }
+
+ private DBObject oneOrAnd(final DBObject[] dbos) {
+ if(dbos.length == 1) {
+ return dbos[0];
+ }
+ return QueryBuilder.start()
+ .and(dbos)
+ .get();
+ }
+
+ @Override
+ public DBObject serialize(final RyaStatement ryaStatement) {
+ final BasicDBObjectBuilder builder = BasicDBObjectBuilder.start("_id", ryaStatement.getSubject().hashCode());
+ final URI obj = ryaStatement.getObject().getDataType();
+
+
+ if(obj.equals(GeoConstants.GEO_AS_WKT) || obj.equals(GeoConstants.GEO_AS_GML) ||
+ obj.equals(GeoConstants.XMLSCHEMA_OGC_GML) || obj.equals(GeoConstants.XMLSCHEMA_OGC_WKT)) {
+ try {
+ final Statement statement = RyaToRdfConversions.convertStatement(ryaStatement);
+ final Geometry geo = GeoParseUtils.getGeometry(statement);
+ builder.add(GEO_KEY, geoStrategy.getCorrespondingPoints(geo));
+ } catch (final ParseException e) {
+ LOG.error("Could not create geometry for statement " + ryaStatement, e);
+ return null;
+ }
+ } else {
+ builder.add(TIME_KEY, temporalStrategy.getTimeValue(ryaStatement.getObject().getData()));
+ }
+ return builder.get();
+ }
+
+ private DBObject[] getGeoObjs(final Collection<IndexingExpr> geoFilters) {
+ final List<DBObject> objs = new ArrayList<>();
+ geoFilters.forEach(filter -> {
+ final GeoPolicy policy = GeoPolicy.fromURI(filter.getFunction());
+ final WKTReader reader = new WKTReader();
+ final String geoStr = filter.getArguments()[0].stringValue();
+ try {
+ //This method is what is used in the GeoIndexer.
+ final Geometry geo = reader.read(geoStr);
+ objs.add(getGeoObject(geo, policy));
+ } catch (final GeoTemporalIndexException | UnsupportedOperationException | ParseException e) {
+ Log.error("Unable to parse '" + geoStr + "'.", e);
+ }
+ });
+ return objs.toArray(new DBObject[]{});
+ }
+
+ private DBObject[] getTemporalObjs(final Collection<IndexingExpr> temporalFilters) {
+ final List<DBObject> objs = new ArrayList<>();
+ temporalFilters.forEach(filter -> {
+ final TemporalPolicy policy = TemporalPolicy.fromURI(filter.getFunction());
+ final String timeStr = filter.getArguments()[0].stringValue();
+ final Matcher matcher = TemporalInstantRfc3339.PATTERN.matcher(timeStr);
+ if(matcher.find()) {
+ final TemporalInterval interval = TemporalInstantRfc3339.parseInterval(timeStr);
+ if(policy == TemporalPolicy.INSTANT_AFTER_INSTANT ||
+ policy == TemporalPolicy.INSTANT_BEFORE_INSTANT ||
+ policy == TemporalPolicy.INSTANT_EQUALS_INSTANT) {
+ if(interval == null) {
+ Log.error("Cannot perform temporal interval based queries on an instant.");
+ }
+ }
+ objs.add(getTemporalObject(interval, policy));
+ } else {
+ final TemporalInstant instant = new TemporalInstantRfc3339(DateTime.parse(timeStr));
+ if(policy != TemporalPolicy.INSTANT_AFTER_INSTANT &&
+ policy != TemporalPolicy.INSTANT_BEFORE_INSTANT &&
+ policy != TemporalPolicy.INSTANT_EQUALS_INSTANT) {
+ Log.error("Cannot perform temporal instant based queries on an interval.");
+ }
+ objs.add(getTemporalObject(instant, policy));
+ }
+ });
+ return objs.toArray(new DBObject[]{});
+ }
+
+ private DBObject getGeoObject (final Geometry geo, final GeoPolicy policy) throws GeoTemporalIndexException {
+ switch(policy) {
+ case CONTAINS:
+ throw new UnsupportedOperationException("Contains queries are not supported in Mongo DB.");
+ case CROSSES:
+ throw new UnsupportedOperationException("Crosses queries are not supported in Mongo DB.");
+ case DISJOINT:
+ throw new UnsupportedOperationException("Disjoint queries are not supported in Mongo DB.");
+ case EQUALS:
+ try {
+ return geoStrategy.getQuery(new GeoQuery(EQUALS, geo));
+ } catch (final MalformedQueryException e) {
+ throw new GeoTemporalIndexException(e.getMessage(), e);
+ }
+ case INTERSECTS:
+ try {
+ return geoStrategy.getQuery(new GeoQuery(INTERSECTS, geo));
+ } catch (final MalformedQueryException e) {
+ throw new GeoTemporalIndexException(e.getMessage(), e);
+ }
+ case OVERLAPS:
+ throw new UnsupportedOperationException("Overlaps queries are not supported in Mongo DB.");
+ case TOUCHES:
+ throw new UnsupportedOperationException("Touches queries are not supported in Mongo DB.");
+ case WITHIN:
+ try {
+ return geoStrategy.getQuery(new GeoQuery(WITHIN, geo));
+ } catch (final MalformedQueryException e) {
+ throw new GeoTemporalIndexException(e.getMessage(), e);
+ }
+ default:
+ return new BasicDBObject();
+ }
+ }
+
+ private DBObject getTemporalObject(final TemporalInstant instant, final TemporalPolicy policy) {
+ final DBObject temporalObj;
+ switch(policy) {
+ case INSTANT_AFTER_INSTANT:
+ temporalObj = QueryBuilder.start(INSTANT)
+ .greaterThan(instant.getAsDateTime().toDate())
+ .get();
+ break;
+ case INSTANT_BEFORE_INSTANT:
+ temporalObj = QueryBuilder.start(INSTANT)
+ .lessThan(instant.getAsDateTime().toDate())
+ .get();
+ break;
+ case INSTANT_EQUALS_INSTANT:
+ temporalObj = QueryBuilder.start(INSTANT)
+ .is(instant.getAsDateTime().toDate())
+ .get();
+ break;
+ default:
+ temporalObj = new BasicDBObject();
+ }
+ return temporalObj;
+ }
+
+ private DBObject getTemporalObject(final TemporalInterval interval, final TemporalPolicy policy) {
+ final DBObject temporalObj;
+ switch(policy) {
+ case INSTANT_AFTER_INTERVAL:
+ temporalObj = QueryBuilder.start(INSTANT)
+ .greaterThan(interval.getHasEnd().getAsDateTime().toDate())
+ .get();
+ break;
+ case INSTANT_BEFORE_INTERVAL:
+ temporalObj = QueryBuilder.start(INSTANT)
+ .lessThan(interval.getHasBeginning().getAsDateTime().toDate())
+ .get();
+ break;
+ case INSTANT_END_INTERVAL:
+ temporalObj = QueryBuilder.start(INSTANT)
+ .is(interval.getHasEnd().getAsDateTime().toDate())
+ .get();
+ break;
+ case INSTANT_IN_INTERVAL:
+ temporalObj = QueryBuilder.start(INSTANT)
+ .greaterThan(interval.getHasBeginning().getAsDateTime().toDate())
+ .lessThan(interval.getHasEnd().getAsDateTime().toDate())
+ .get();
+ break;
+ case INSTANT_START_INTERVAL:
+ temporalObj = QueryBuilder.start(INSTANT)
+ .is(interval.getHasBeginning().getAsDateTime().toDate())
+ .get();
+ break;
+ case INTERVAL_AFTER:
+ temporalObj = QueryBuilder.start(INTERVAL_START)
+ .greaterThan(interval.getHasEnd().getAsDateTime().toDate())
+ .get();
+ break;
+ case INTERVAL_BEFORE:
+ temporalObj = QueryBuilder.start(INTERVAL_END)
+ .lessThan(interval.getHasBeginning().getAsDateTime().toDate())
+ .get();
+ break;
+ case INTERVAL_EQUALS:
+ temporalObj = QueryBuilder.start(INTERVAL_START)
+ .is(interval.getHasBeginning().getAsDateTime().toDate())
+ .and(INTERVAL_END)
+ .is(interval.getHasEnd().getAsDateTime().toDate())
+ .get();
+ break;
+ default:
+ temporalObj = new BasicDBObject();
+ }
+ return temporalObj;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63095d45/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/mongo/MongoEventStorage.java
----------------------------------------------------------------------
diff --git a/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/mongo/MongoEventStorage.java b/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/mongo/MongoEventStorage.java
new file mode 100644
index 0000000..8ddf075
--- /dev/null
+++ b/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/mongo/MongoEventStorage.java
@@ -0,0 +1,196 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.geotemporal.mongo;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+
+import org.apache.rya.api.domain.RyaURI;
+import org.apache.rya.indexing.IndexingExpr;
+import org.apache.rya.indexing.entity.model.TypedEntity;
+import org.apache.rya.indexing.entity.storage.mongo.DocumentConverter.DocumentConverterException;
+import org.apache.rya.indexing.geotemporal.GeoTemporalIndexException;
+import org.apache.rya.indexing.geotemporal.model.Event;
+import org.apache.rya.indexing.geotemporal.storage.EventStorage;
+import org.apache.rya.indexing.entity.storage.mongo.MongoEntityStorage;
+import org.bson.BsonDocument;
+import org.bson.BsonString;
+import org.bson.Document;
+import org.bson.conversions.Bson;
+
+import com.mongodb.BasicDBObjectBuilder;
+import com.mongodb.DBObject;
+import com.mongodb.ErrorCategory;
+import com.mongodb.MongoClient;
+import com.mongodb.MongoException;
+import com.mongodb.client.MongoCollection;
+import com.mongodb.client.MongoCursor;
+import com.mongodb.client.model.Filters;
+
+public class MongoEventStorage implements EventStorage {
+
+ protected static final String COLLECTION_NAME = "geotemporal-events";
+
+ private static final EventDocumentConverter EVENT_CONVERTER = new EventDocumentConverter();
+
+ /**
+ * A client connected to the Mongo instance that hosts the Rya instance.
+ */
+ protected final MongoClient mongo;
+
+ /**
+ * The name of the Rya instance the {@link TypedEntity}s are for.
+ */
+ protected final String ryaInstanceName;
+
+ /*
+ * Used to get the filter query objects.
+ */
+ private final GeoTemporalMongoDBStorageStrategy queryAdapter;
+
+ /**
+ * Constructs an instance of {@link MongoEntityStorage}.
+ *
+ * @param mongo - A client connected to the Mongo instance that hosts the Rya instance. (not null)
+ * @param ryaInstanceName - The name of the Rya instance the {@link TypedEntity}s are for. (not null)
+ */
+ public MongoEventStorage(final MongoClient mongo, final String ryaInstanceName) {
+ this.mongo = requireNonNull(mongo);
+ this.ryaInstanceName = requireNonNull(ryaInstanceName);
+ queryAdapter = new GeoTemporalMongoDBStorageStrategy();
+ }
+
+ @Override
+ public void create(final Event event) throws EventStorageException {
+ requireNonNull(event);
+
+ try {
+ mongo.getDatabase(ryaInstanceName)
+ .getCollection(COLLECTION_NAME)
+ .insertOne(EVENT_CONVERTER.toDocument(event));
+ } catch(final MongoException e) {
+ final ErrorCategory category = ErrorCategory.fromErrorCode( e.getCode() );
+ if(category == ErrorCategory.DUPLICATE_KEY) {
+ throw new EventAlreadyExistsException("Failed to create Event with Subject '" + event.getSubject().getData() + "'.", e);
+ }
+ throw new EventStorageException("Failed to create Event with Subject '" + event.getSubject().getData() + "'.", e);
+ }
+ }
+
+ @Override
+ public Optional<Event> get(final RyaURI subject) throws EventStorageException {
+ requireNonNull(subject);
+
+ try {
+ final Document document = mongo.getDatabase(ryaInstanceName)
+ .getCollection(COLLECTION_NAME)
+ .find( new BsonDocument(EventDocumentConverter.SUBJECT, new BsonString(subject.getData())) )
+ .first();
+
+ return document == null ?
+ Optional.empty() :
+ Optional.of( EVENT_CONVERTER.fromDocument(document) );
+
+ } catch(final MongoException | DocumentConverterException e) {
+ throw new EventStorageException("Could not get the Event with Subject '" + subject.getData() + "'.", e);
+ }
+ }
+
+ @Override
+ public Collection<Event> search(final Optional<RyaURI> subject, final Optional<Collection<IndexingExpr>> geoFilters, final Optional<Collection<IndexingExpr>> temporalFilters) throws EventStorageException {
+ requireNonNull(subject);
+
+ try {
+ final Collection<IndexingExpr> geos = (geoFilters.isPresent() ? geoFilters.get() : new ArrayList<>());
+ final Collection<IndexingExpr> tempos = (temporalFilters.isPresent() ? temporalFilters.get() : new ArrayList<>());
+ final DBObject filterObj = queryAdapter.getFilterQuery(geos, tempos);
+
+ final BasicDBObjectBuilder builder = BasicDBObjectBuilder
+ .start(filterObj.toMap());
+ if(subject.isPresent()) {
+ builder.append(EventDocumentConverter.SUBJECT, subject.get().getData());
+ }
+ final MongoCursor<Document> results = mongo.getDatabase(ryaInstanceName)
+ .getCollection(COLLECTION_NAME)
+ .find( BsonDocument.parse(builder.get().toString()) )
+ .iterator();
+
+ final List<Event> events = new ArrayList<>();
+ final EventDocumentConverter adapter = new EventDocumentConverter();
+ while(results.hasNext()) {
+ events.add(adapter.fromDocument(results.next()));
+ }
+ return events;
+ } catch(final MongoException | DocumentConverterException | GeoTemporalIndexException e) {
+ throw new EventStorageException("Could not get the Event.", e);
+ }
+ }
+
+ @Override
+ public void update(final Event old, final Event updated) throws StaleUpdateException, EventStorageException {
+ requireNonNull(old);
+ requireNonNull(updated);
+
+ // The updated entity must have the same Subject as the one it is replacing.
+ if(!old.getSubject().equals(updated.getSubject())) {
+ throw new EventStorageException("The old Event and the updated Event must have the same Subject. " +
+ "Old Subject: " + old.getSubject().getData() + ", Updated Subject: " + updated.getSubject().getData());
+ }
+
+ final Set<Bson> filters = new HashSet<>();
+
+ // Must match the old entity's Subject.
+ filters.add( makeSubjectFilter(old.getSubject()) );
+
+ // Do a find and replace.
+ final Bson oldEntityFilter = Filters.and(filters);
+ final Document updatedDoc = EVENT_CONVERTER.toDocument(updated);
+
+ final MongoCollection<Document> collection = mongo.getDatabase(ryaInstanceName).getCollection(COLLECTION_NAME);
+ if(collection.findOneAndReplace(oldEntityFilter, updatedDoc) == null) {
+ throw new StaleUpdateException("Could not update the Event with Subject '" + updated.getSubject().getData() + ".");
+ }
+ }
+
+ @Override
+ public boolean delete(final RyaURI subject) throws EventStorageException {
+ requireNonNull(subject);
+
+ try {
+ final Document deleted = mongo.getDatabase(ryaInstanceName)
+ .getCollection(COLLECTION_NAME)
+ .findOneAndDelete( makeSubjectFilter(subject) );
+
+ return deleted != null;
+
+ } catch(final MongoException e) {
+ throw new EventStorageException("Could not delete the Event with Subject '" + subject.getData() + "'.", e);
+ }
+ }
+
+ private static Bson makeSubjectFilter(final RyaURI subject) {
+ return Filters.eq(EventDocumentConverter.SUBJECT, subject.getData());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63095d45/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/mongo/MongoGeoTemporalIndexer.java
----------------------------------------------------------------------
diff --git a/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/mongo/MongoGeoTemporalIndexer.java b/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/mongo/MongoGeoTemporalIndexer.java
new file mode 100644
index 0000000..1baab18
--- /dev/null
+++ b/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/mongo/MongoGeoTemporalIndexer.java
@@ -0,0 +1,222 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.geotemporal.mongo;
+
+import static com.google.common.base.Preconditions.checkState;
+import static java.util.Objects.requireNonNull;
+
+import java.io.IOException;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.regex.Matcher;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.log4j.Logger;
+import org.apache.rya.api.domain.RyaStatement;
+import org.apache.rya.api.domain.RyaURI;
+import org.apache.rya.api.resolver.RyaToRdfConversions;
+import org.apache.rya.indexing.GeoConstants;
+import org.apache.rya.indexing.TemporalInstant;
+import org.apache.rya.indexing.TemporalInstantRfc3339;
+import org.apache.rya.indexing.TemporalInterval;
+import org.apache.rya.indexing.accumulo.ConfigUtils;
+import org.apache.rya.indexing.accumulo.geo.GeoParseUtils;
+import org.apache.rya.indexing.geotemporal.GeoTemporalIndexer;
+import org.apache.rya.indexing.geotemporal.model.Event;
+import org.apache.rya.indexing.geotemporal.storage.EventStorage;
+import org.apache.rya.indexing.mongodb.AbstractMongoIndexer;
+import org.apache.rya.indexing.mongodb.IndexingException;
+import org.apache.rya.mongodb.MongoConnectorFactory;
+import org.apache.rya.mongodb.MongoDBRdfConfiguration;
+import org.joda.time.DateTime;
+import org.openrdf.model.Statement;
+import org.openrdf.model.URI;
+
+import com.vividsolutions.jts.geom.Geometry;
+import com.vividsolutions.jts.io.ParseException;
+
+/**
+ * Indexer that stores 2 separate statements as one 'Event' entity.
+ * <p>
+ * The statements are required to have the same subject, one must be
+ * a Geo based statement and the other a temporal based statement.
+ * <p>
+ * This indexer is later used when querying for geo/temporal statements
+ * in the format of:
+ * <pre>
+ * {@code
+ * QUERY PARAMS
+ * ?SomeSubject geo:predicate ?Location
+ * ?SomeSubject time:predicate ?Time
+ * Filter(?Location, geoFunction())
+ * Filter(?Time, temporalFunction())
+ * }
+ *
+ * The number of filters is not strict, but there must be at least one
+ * query pattern for geo and one for temporal as well as at least one
+ * filter for each type.
+ */
+public class MongoGeoTemporalIndexer extends AbstractMongoIndexer<GeoTemporalMongoDBStorageStrategy> implements GeoTemporalIndexer {
+ private static final Logger LOG = Logger.getLogger(MongoGeoTemporalIndexer.class);
+ public static final String GEO_TEMPORAL_COLLECTION = "geo_temporal";
+
+ private final AtomicReference<MongoDBRdfConfiguration> configuration = new AtomicReference<>();
+ private final AtomicReference<EventStorage> events = new AtomicReference<>();
+
+ @Override
+ public void init() {
+ initCore();
+ predicates = ConfigUtils.getGeoPredicates(conf);
+ predicates.addAll(ConfigUtils.getTemporalPredicates(conf));
+ storageStrategy = new GeoTemporalMongoDBStorageStrategy();
+ }
+
+ @Override
+ public void setConf(final Configuration conf) {
+ requireNonNull(conf);
+ events.set(null);
+ events.set(getEventStorage(conf));
+ super.conf = conf;
+ configuration.set(new MongoDBRdfConfiguration(conf));
+ }
+
+ @Override
+ public void storeStatement(final RyaStatement ryaStatement) throws IOException {
+ requireNonNull(ryaStatement);
+
+ try {
+ updateEvent(ryaStatement.getSubject(), ryaStatement);
+ } catch (IndexingException | ParseException e) {
+ throw new IOException("Failed to update the Entity index.", e);
+ }
+ }
+
+ @Override
+ public void deleteStatement(final RyaStatement statement) throws IOException {
+ requireNonNull(statement);
+ final RyaURI subject = statement.getSubject();
+ try {
+ final EventStorage eventStore = events.get();
+ checkState(events != null, "Must set this indexers configuration before storing statements.");
+
+ new EventUpdater(eventStore).update(subject, old -> {
+ final Event.Builder updated;
+ if(!old.isPresent()) {
+ return Optional.empty();
+ } else {
+ updated = Event.builder(old.get());
+ }
+
+ final Event currentEvent = updated.build();
+ final URI pred = statement.getObject().getDataType();
+ if((pred.equals(GeoConstants.GEO_AS_WKT) || pred.equals(GeoConstants.GEO_AS_GML) ||
+ pred.equals(GeoConstants.XMLSCHEMA_OGC_WKT) || pred.equals(GeoConstants.XMLSCHEMA_OGC_GML))
+ && currentEvent.getGeometry().isPresent()) {
+ //is geo and needs to be removed.
+ try {
+ if(currentEvent.getGeometry().get().equals(GeoParseUtils.getGeometry(RyaToRdfConversions.convertStatement(statement)))) {
+ updated.setGeometry(null);
+ }
+ } catch (final Exception e) {
+ LOG.debug("Unable to parse the stored geometry.");
+ }
+ } else {
+ //is time
+ final String dateTime = statement.getObject().getData();
+ final Matcher matcher = TemporalInstantRfc3339.PATTERN.matcher(dateTime);
+ if (matcher.find()) {
+ final TemporalInterval interval = TemporalInstantRfc3339.parseInterval(dateTime);
+ if(currentEvent.getInterval().get().equals(interval)) {
+ updated.setTemporalInterval(null);
+ }
+ } else {
+ final TemporalInstant instant = new TemporalInstantRfc3339(DateTime.parse(dateTime));
+ if(currentEvent.getInstant().get().equals(instant)) {
+ updated.setTemporalInstant(null);
+ }
+ }
+ }
+ return Optional.of(updated.build());
+ });
+ } catch (final IndexingException e) {
+ throw new IOException("Failed to update the Entity index.", e);
+ }
+ }
+
+ private void updateEvent(final RyaURI subject, final RyaStatement statement) throws IndexingException, ParseException {
+ final EventStorage eventStore = events.get();
+ checkState(events != null, "Must set this indexers configuration before storing statements.");
+
+ new EventUpdater(eventStore).update(subject, old -> {
+ final Event.Builder updated;
+ if(!old.isPresent()) {
+ updated = Event.builder()
+ .setSubject(subject);
+ } else {
+ updated = Event.builder(old.get());
+ }
+
+ final URI pred = statement.getObject().getDataType();
+ if(pred.equals(GeoConstants.GEO_AS_WKT) || pred.equals(GeoConstants.GEO_AS_GML) ||
+ pred.equals(GeoConstants.XMLSCHEMA_OGC_WKT) || pred.equals(GeoConstants.XMLSCHEMA_OGC_GML)) {
+ //is geo
+ try {
+ final Statement geoStatement = RyaToRdfConversions.convertStatement(statement);
+ final Geometry geometry = GeoParseUtils.getGeometry(geoStatement);
+ updated.setGeometry(geometry);
+ } catch (final ParseException e) {
+ LOG.error(e.getMessage(), e);
+ }
+ } else {
+ //is time
+ final String dateTime = statement.getObject().getData();
+ final Matcher matcher = TemporalInstantRfc3339.PATTERN.matcher(dateTime);
+ if (matcher.find()) {
+ final TemporalInterval interval = TemporalInstantRfc3339.parseInterval(dateTime);
+ updated.setTemporalInterval(interval);
+ } else {
+ final TemporalInstant instant = new TemporalInstantRfc3339(DateTime.parse(dateTime));
+ updated.setTemporalInstant(instant);
+ }
+ }
+ return Optional.of(updated.build());
+ });
+ }
+
+ @Override
+ public String getCollectionName() {
+ return ConfigUtils.getTablePrefix(conf) + GEO_TEMPORAL_COLLECTION;
+ }
+
+ @Override
+ public EventStorage getEventStorage(final Configuration conf) {
+ if(events.get() != null) {
+ return events.get();
+ }
+
+ final MongoDBRdfConfiguration mongoConf = (MongoDBRdfConfiguration) conf;
+ mongoClient = mongoConf.getMongoClient();
+ if (mongoClient == null) {
+ mongoClient = MongoConnectorFactory.getMongoClient(conf);
+ }
+ final String ryaInstanceName = mongoConf.getMongoDBName();
+ events.set(new MongoEventStorage(mongoClient, ryaInstanceName));
+ return events.get();
+ }
+}
[5/5] incubator-rya git commit: RYA-239 GeoTemporal tests added
Posted by mi...@apache.org.
RYA-239 GeoTemporal tests added
Added tests and fixes that were needed while
writing tests. Closes #138
Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/646d21b4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/646d21b4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/646d21b4
Branch: refs/heads/master
Commit: 646d21b4e6ed9f7e010ffa8e75b2801f48456c1e
Parents: 440a4bf
Author: isper3at <sm...@gmail.com>
Authored: Fri Mar 31 13:38:30 2017 -0400
Committer: Aaron Mihalik <mi...@alum.mit.edu>
Committed: Wed Jun 14 14:42:45 2017 -0400
----------------------------------------------------------------------
.../apache/rya/indexing/TemporalTupleSet.java | 2 -
.../rya/indexing/accumulo/ConfigUtils.java.orig | 529 +++++++++++++++++++
.../indexing/entity/update/EntityUpdater.java | 4 +-
.../rya/indexing/mongodb/MongoDbSmartUri.java | 9 +-
.../mongodb/update/DocumentUpdater.java | 98 ----
.../mongodb/update/MongoDocumentUpdater.java | 98 ++++
.../mongodb/update/RyaObjectStorage.java | 1 +
.../GeoEnabledFilterFunctionOptimizer.java | 47 +-
.../indexing/accumulo/geo/GeoParseUtils.java | 75 ++-
.../GeoTemporalIndexSetProvider.java | 46 +-
.../geotemporal/GeoTemporalIndexer.java | 61 ++-
.../geotemporal/model/EventQueryNode.java | 121 ++++-
.../geotemporal/mongo/EventUpdater.java | 4 +-
.../GeoTemporalMongoDBStorageStrategy.java | 3 +-
.../geotemporal/mongo/MongoEventStorage.java | 5 +-
.../mongo/MongoGeoTemporalIndexer.java | 6 +-
.../geotemporal/GeoTemporalProviderTest.java | 222 ++++++++
.../geotemporal/GeoTemporalTestBase.java | 140 +++++
.../geotemporal/MongoGeoTemporalIndexIT.java | 174 ++++++
.../geotemporal/model/EventQueryNodeTest.java | 368 +++++++++++++
.../mongo/EventDocumentConverterTest.java | 64 +++
.../GeoTemporalMongoDBStorageStrategyTest.java | 469 ++++++++++++++++
.../mongo/MongoEventStorageTest.java | 197 +++++++
.../mongo/MongoGeoTemporalIndexerIT.java | 126 +++++
.../indexing/geotemporal/mongo/MongoITBase.java | 81 +++
25 files changed, 2730 insertions(+), 220 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/646d21b4/extras/indexing/src/main/java/org/apache/rya/indexing/TemporalTupleSet.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/org/apache/rya/indexing/TemporalTupleSet.java b/extras/indexing/src/main/java/org/apache/rya/indexing/TemporalTupleSet.java
index 808afdf..7cb4e6c 100644
--- a/extras/indexing/src/main/java/org/apache/rya/indexing/TemporalTupleSet.java
+++ b/extras/indexing/src/main/java/org/apache/rya/indexing/TemporalTupleSet.java
@@ -126,10 +126,8 @@ public class TemporalTupleSet extends ExternalTupleSet {
public static class TemporalSearchFunctionFactory {
private final Map<URI, SearchFunction> SEARCH_FUNCTION_MAP = Maps.newHashMap();
private final TemporalIndexer temporalIndexer;
- Configuration conf;
public TemporalSearchFunctionFactory(final Configuration conf, final TemporalIndexer temporalIndexer) {
- this.conf = conf;
this.temporalIndexer = temporalIndexer;
}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/646d21b4/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/ConfigUtils.java.orig
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/ConfigUtils.java.orig b/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/ConfigUtils.java.orig
new file mode 100644
index 0000000..9311200
--- /dev/null
+++ b/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/ConfigUtils.java.orig
@@ -0,0 +1,529 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.accumulo;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.BatchScanner;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.MultiTableBatchWriter;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableExistsException;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.ZooKeeperInstance;
+import org.apache.accumulo.core.client.admin.TableOperations;
+import org.apache.accumulo.core.client.mock.MockInstance;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.log4j.Logger;
+import org.apache.rya.accumulo.AccumuloRdfConfiguration;
+import org.apache.rya.accumulo.utils.ConnectorFactory;
+import org.apache.rya.api.RdfCloudTripleStoreConfiguration;
+import org.apache.rya.api.instance.RyaDetails;
+import org.apache.rya.indexing.FilterFunctionOptimizer;
+import org.apache.rya.indexing.accumulo.entity.EntityCentricIndex;
+import org.apache.rya.indexing.accumulo.entity.EntityOptimizer;
+import org.apache.rya.indexing.accumulo.freetext.AccumuloFreeTextIndexer;
+import org.apache.rya.indexing.accumulo.freetext.LuceneTokenizer;
+import org.apache.rya.indexing.accumulo.freetext.Tokenizer;
+import org.apache.rya.indexing.accumulo.temporal.AccumuloTemporalIndexer;
+import org.apache.rya.indexing.entity.EntityIndexOptimizer;
+import org.apache.rya.indexing.entity.update.mongo.MongoEntityIndexer;
+import org.apache.rya.indexing.external.PrecomputedJoinIndexer;
+import org.apache.rya.indexing.mongodb.freetext.MongoFreeTextIndexer;
+import org.apache.rya.indexing.mongodb.temporal.MongoTemporalIndexer;
+import org.apache.rya.indexing.pcj.matching.PCJOptimizer;
+import org.apache.rya.indexing.statement.metadata.matching.StatementMetadataOptimizer;
+import org.openrdf.model.URI;
+import org.openrdf.model.impl.URIImpl;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.Lists;
+
+/**
+ * A set of configuration utils to read a Hadoop {@link Configuration} object and create Cloudbase/Accumulo objects.
+ * Soon will deprecate this class. Use installer for the set methods, use {@link RyaDetails} for the get methods.
+ * New code must separate parameters that are set at Rya install time from that which is specific to the client.
+ * Also Accumulo index tables are pushed down to the implementation and not configured in conf.
+ */
+public class ConfigUtils {
+ private static final Logger logger = Logger.getLogger(ConfigUtils.class);
+
+ /**
+ * @Deprecated use {@link RdfCloudTripleStoreConfiguration#CONF_TBL_PREFIX} instead.
+ */
+ @Deprecated
+ public static final String CLOUDBASE_TBL_PREFIX = RdfCloudTripleStoreConfiguration.CONF_TBL_PREFIX;
+
+ /**
+ * @Deprecated use {@link AccumuloRdfConfiguration#CLOUDBASE_INSTANCE} instead.
+ */
+ @Deprecated
+ public static final String CLOUDBASE_INSTANCE = AccumuloRdfConfiguration.CLOUDBASE_INSTANCE;
+
+ /**
+ * @Deprecated use {@link AccumuloRdfConfiguration#CLOUDBASE_ZOOKEEPERS} instead.
+ */
+ @Deprecated
+ public static final String CLOUDBASE_ZOOKEEPERS = AccumuloRdfConfiguration.CLOUDBASE_ZOOKEEPERS;
+
+ /**
+ * @Deprecated use {@link AccumuloRdfConfiguration#CLOUDBASE_USER} instead.
+ */
+ @Deprecated
+ public static final String CLOUDBASE_USER = AccumuloRdfConfiguration.CLOUDBASE_USER;
+
+ /**
+ * @Deprecated use {@link AccumuloRdfConfiguration#CLOUDBASE_PASSWORD} instead.
+ */
+ @Deprecated
+ public static final String CLOUDBASE_PASSWORD = AccumuloRdfConfiguration.CLOUDBASE_PASSWORD;
+ /**
+ * @Deprecated use {@link RdfCloudTripleStoreConfiguration#CONF_QUERY_AUTH} instead.
+ */
+ @Deprecated
+ public static final String CLOUDBASE_AUTHS = RdfCloudTripleStoreConfiguration.CONF_QUERY_AUTH;
+
+ public static final String CLOUDBASE_WRITER_MAX_WRITE_THREADS = "sc.cloudbase.writer.maxwritethreads";
+ public static final String CLOUDBASE_WRITER_MAX_LATENCY = "sc.cloudbase.writer.maxlatency";
+ public static final String CLOUDBASE_WRITER_MAX_MEMORY = "sc.cloudbase.writer.maxmemory";
+
+ public static final String FREE_TEXT_QUERY_TERM_LIMIT = "sc.freetext.querytermlimit";
+
+ public static final String USE_FREETEXT = "sc.use_freetext";
+ public static final String USE_TEMPORAL = "sc.use_temporal";
+ public static final String USE_ENTITY = "sc.use_entity";
+ public static final String USE_PCJ = "sc.use_pcj";
+ public static final String USE_OPTIMAL_PCJ = "sc.use.optimal.pcj";
+ public static final String USE_PCJ_UPDATER_INDEX = "sc.use.updater";
+
+ public static final String FLUO_APP_NAME = "rya.indexing.pcj.fluo.fluoAppName";
+ public static final String USE_PCJ_FLUO_UPDATER = "rya.indexing.pcj.updater.fluo";
+ public static final String PCJ_STORAGE_TYPE = "rya.indexing.pcj.storageType";
+ public static final String PCJ_UPDATER_TYPE = "rya.indexing.pcj.updaterType";
+
+ public static final String USE_MOCK_INSTANCE = AccumuloRdfConfiguration.USE_MOCK_INSTANCE;
+
+ public static final String NUM_PARTITIONS = "sc.cloudbase.numPartitions";
+
+ private static final int WRITER_MAX_WRITE_THREADS = 1;
+ private static final long WRITER_MAX_LATNECY = Long.MAX_VALUE;
+ private static final long WRITER_MAX_MEMORY = 10000L;
+
+ public static final String DISPLAY_QUERY_PLAN = "query.printqueryplan";
+
+ public static final String FREETEXT_PREDICATES_LIST = "sc.freetext.predicates";
+ public static final String FREETEXT_DOC_NUM_PARTITIONS = "sc.freetext.numPartitions.text";
+ public static final String FREETEXT_TERM_NUM_PARTITIONS = "sc.freetext.numPartitions.term";
+
+ public static final String TOKENIZER_CLASS = "sc.freetext.tokenizer.class";
+
+ public static final String GEO_PREDICATES_LIST = "sc.geo.predicates";
+
+ public static final String TEMPORAL_PREDICATES_LIST = "sc.temporal.predicates";
+
+ public static final String USE_MONGO = "sc.useMongo";
+
+ public static boolean isDisplayQueryPlan(final Configuration conf) {
+ return conf.getBoolean(DISPLAY_QUERY_PLAN, false);
+ }
+
+ /**
+ * get a value from the configuration file and throw an exception if the
+ * value does not exist.
+ *
+ * @param conf
+ * @param key
+ * @return
+ */
+ private static String getStringCheckSet(final Configuration conf, final String key) {
+ final String value = conf.get(key);
+ requireNonNull(value, key + " not set");
+ return value;
+ }
+
+ /**
+ * @param conf
+ * @param tablename
+ * @return if the table was created
+ * @throws AccumuloException
+ * @throws AccumuloSecurityException
+ * @throws TableExistsException
+ */
+ public static boolean createTableIfNotExists(final Configuration conf, final String tablename)
+ throws AccumuloException, AccumuloSecurityException, TableExistsException {
+ final TableOperations tops = getConnector(conf).tableOperations();
+ if (!tops.exists(tablename)) {
+ logger.info("Creating table: " + tablename);
+ tops.create(tablename);
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * Lookup the table name prefix in the conf and throw an error if it is
+ * null. Future, get table prefix from RyaDetails -- the Rya instance name
+ * -- also getting info from the RyaDetails should happen within
+ * RyaSailFactory and not ConfigUtils.
+ *
+ * @param conf
+ * Rya configuration map where it extracts the prefix (instance
+ * name)
+ * @return index table prefix corresponding to this Rya instance
+ */
+ public static String getTablePrefix(final Configuration conf) {
+ final String tablePrefix;
+ tablePrefix = conf.get(RdfCloudTripleStoreConfiguration.CONF_TBL_PREFIX);
+ requireNonNull(tablePrefix,
+ "Configuration key: " + RdfCloudTripleStoreConfiguration.CONF_TBL_PREFIX + " not set. Cannot generate table name.");
+ return tablePrefix;
+ }
+
+ public static int getFreeTextTermLimit(final Configuration conf) {
+ return conf.getInt(FREE_TEXT_QUERY_TERM_LIMIT, 100);
+ }
+
+ public static Set<URI> getFreeTextPredicates(final Configuration conf) {
+ return getPredicates(conf, FREETEXT_PREDICATES_LIST);
+ }
+
+ public static Set<URI> getGeoPredicates(final Configuration conf) {
+ return getPredicates(conf, GEO_PREDICATES_LIST);
+ }
+
+ /**
+ * Used for indexing statements about date & time instances and intervals.
+ *
+ * @param conf
+ * @return Set of predicate URI's whose objects should be date time
+ * literals.
+ */
+ public static Set<URI> getTemporalPredicates(final Configuration conf) {
+ return getPredicates(conf, TEMPORAL_PREDICATES_LIST);
+ }
+
+ protected static Set<URI> getPredicates(final Configuration conf, final String confName) {
+ final String[] validPredicateStrings = conf.getStrings(confName, new String[] {});
+ final Set<URI> predicates = new HashSet<>();
+ for (final String prediateString : validPredicateStrings) {
+ predicates.add(new URIImpl(prediateString));
+ }
+ return predicates;
+ }
+
+ public static Tokenizer getFreeTextTokenizer(final Configuration conf) {
+ final Class<? extends Tokenizer> c = conf.getClass(TOKENIZER_CLASS, LuceneTokenizer.class, Tokenizer.class);
+ return ReflectionUtils.newInstance(c, conf);
+ }
+
+ public static BatchWriter createDefaultBatchWriter(final String tablename, final Configuration conf)
+ throws TableNotFoundException, AccumuloException, AccumuloSecurityException {
+ final Long DEFAULT_MAX_MEMORY = getWriterMaxMemory(conf);
+ final Long DEFAULT_MAX_LATENCY = getWriterMaxLatency(conf);
+ final Integer DEFAULT_MAX_WRITE_THREADS = getWriterMaxWriteThreads(conf);
+ final Connector connector = ConfigUtils.getConnector(conf);
+ return connector.createBatchWriter(tablename, DEFAULT_MAX_MEMORY, DEFAULT_MAX_LATENCY, DEFAULT_MAX_WRITE_THREADS);
+ }
+
+ public static MultiTableBatchWriter createMultitableBatchWriter(final Configuration conf)
+ throws AccumuloException, AccumuloSecurityException {
+ final Long DEFAULT_MAX_MEMORY = getWriterMaxMemory(conf);
+ final Long DEFAULT_MAX_LATENCY = getWriterMaxLatency(conf);
+ final Integer DEFAULT_MAX_WRITE_THREADS = getWriterMaxWriteThreads(conf);
+ final Connector connector = ConfigUtils.getConnector(conf);
+ return connector.createMultiTableBatchWriter(DEFAULT_MAX_MEMORY, DEFAULT_MAX_LATENCY, DEFAULT_MAX_WRITE_THREADS);
+ }
+
+ public static Scanner createScanner(final String tablename, final Configuration conf)
+ throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
+ final Connector connector = ConfigUtils.getConnector(conf);
+ final Authorizations auths = ConfigUtils.getAuthorizations(conf);
+ return connector.createScanner(tablename, auths);
+
+ }
+
+ public static BatchScanner createBatchScanner(final String tablename, final Configuration conf)
+ throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
+ final Connector connector = ConfigUtils.getConnector(conf);
+ final Authorizations auths = ConfigUtils.getAuthorizations(conf);
+ Integer numThreads = null;
+ if (conf instanceof RdfCloudTripleStoreConfiguration) {
+ numThreads = ((RdfCloudTripleStoreConfiguration) conf).getNumThreads();
+ } else {
+ numThreads = conf.getInt(RdfCloudTripleStoreConfiguration.CONF_NUM_THREADS, 2);
+ }
+ return connector.createBatchScanner(tablename, auths, numThreads);
+ }
+
+ public static int getWriterMaxWriteThreads(final Configuration conf) {
+ return conf.getInt(CLOUDBASE_WRITER_MAX_WRITE_THREADS, WRITER_MAX_WRITE_THREADS);
+ }
+
+ public static long getWriterMaxLatency(final Configuration conf) {
+ return conf.getLong(CLOUDBASE_WRITER_MAX_LATENCY, WRITER_MAX_LATNECY);
+ }
+
+ public static long getWriterMaxMemory(final Configuration conf) {
+ return conf.getLong(CLOUDBASE_WRITER_MAX_MEMORY, WRITER_MAX_MEMORY);
+ }
+
+ public static String getUsername(final JobContext job) {
+ return getUsername(job.getConfiguration());
+ }
+
+ /**
+ * Get the Accumulo username from the configuration object that is meant to
+ * be used when connecting a {@link Connector} to Accumulo.
+ *
+ * @param conf - The configuration object that will be interrogated. (not null)
+ * @return The username if one could be found; otherwise {@code null}.
+ */
+ public static String getUsername(final Configuration conf) {
+ return new AccumuloRdfConfiguration(conf).getUsername();
+ }
+
+ public static Authorizations getAuthorizations(final JobContext job) {
+ return getAuthorizations(job.getConfiguration());
+ }
+
+ public static Authorizations getAuthorizations(final Configuration conf) {
+ final String authString = conf.get(RdfCloudTripleStoreConfiguration.CONF_QUERY_AUTH, "");
+ if (authString.isEmpty()) {
+ return new Authorizations();
+ }
+ return new Authorizations(authString.split(","));
+ }
+
+ public static Instance getInstance(final JobContext job) {
+ return getInstance(job.getConfiguration());
+ }
+
+ /**
+ * Create an {@link Instance} that may be used to create {@link Connector}s
+ * to Accumulo. If the configuration has the {@link #USE_MOCK_INSTANCE} flag
+ * set, then the instance will be be a {@link MockInstance} instead of a
+ * Zookeeper backed instance.
+ *
+ * @param conf - The configuration object that will be interrogated. (not null)
+ * @return The {@link Instance} that may be used to connect to Accumulo.
+ */
+ public static Instance getInstance(final Configuration conf) {
+ // Pull out the Accumulo specific configuration values.
+ final AccumuloRdfConfiguration accConf = new AccumuloRdfConfiguration(conf);
+ String instanceName = accConf.getInstanceName();
+ String zoookeepers = accConf.getZookeepers();
+
+ // Create an Instance a mock if the mock flag is set.
+ if (useMockInstance(conf)) {
+ return new MockInstance(instanceName);
+ }
+
+ // Otherwise create an Instance to a Zookeeper managed instance of Accumulo.
+ return new ZooKeeperInstance(instanceName, zoookeepers);
+ }
+
+ public static String getPassword(final JobContext job) {
+ return getPassword(job.getConfiguration());
+ }
+
+ /**
+ * Get the Accumulo password from the configuration object that is meant to
+ * be used when connecting a {@link Connector} to Accumulo.
+ *
+ * @param conf - The configuration object that will be interrogated. (not null)
+ * @return The password if one could be found; otherwise an empty string.
+ */
+ public static String getPassword(final Configuration conf) {
+ return new AccumuloRdfConfiguration(conf).getPassword();
+ }
+
+ public static Connector getConnector(final JobContext job) throws AccumuloException, AccumuloSecurityException {
+ return getConnector(job.getConfiguration());
+ }
+
+ /**
+ * Create an Accumulo {@link Connector} using the configured connection information.
+ * If the connection information points to a mock instance of Accumulo, then the
+ * {@link #USE_MOCK_INSTANCE} flag must be set.
+ *
+ * @param conf - Configures how the connector will be built. (not null)
+ * @return A {@link Connector} that may be used to interact with the configured Accumulo instance.
+ * @throws AccumuloException The connector couldn't be created because of an Accumulo problem.
+ * @throws AccumuloSecurityException The connector couldn't be created because of an Accumulo security violation.
+ */
+ public static Connector getConnector(final Configuration conf) throws AccumuloException, AccumuloSecurityException {
+ return ConnectorFactory.connect( new AccumuloRdfConfiguration(conf) );
+ }
+
+ /**
+ * Indicates that a Mock instance of Accumulo is being used to back the Rya instance.
+ *
+ * @param conf - The configuration object that will be interrogated. (not null)
+ * @return {@code true} if the Rya instance is backed by a mock Accumulo; otherwise {@code false}.
+ */
+ public static boolean useMockInstance(final Configuration conf) {
+ return new AccumuloRdfConfiguration(conf).useMockInstance();
+ }
+
+ protected static int getNumPartitions(final Configuration conf) {
+ return conf.getInt(NUM_PARTITIONS, 25);
+ }
+
+ public static int getFreeTextDocNumPartitions(final Configuration conf) {
+ return conf.getInt(FREETEXT_DOC_NUM_PARTITIONS, getNumPartitions(conf));
+ }
+
+ public static int getFreeTextTermNumPartitions(final Configuration conf) {
+ return conf.getInt(FREETEXT_TERM_NUM_PARTITIONS, getNumPartitions(conf));
+ }
+
+ public static boolean getUseFreeText(final Configuration conf) {
+ return conf.getBoolean(USE_FREETEXT, false);
+ }
+
+ public static boolean getUseTemporal(final Configuration conf) {
+ return conf.getBoolean(USE_TEMPORAL, false);
+ }
+
+ public static boolean getUseEntity(final Configuration conf) {
+ return conf.getBoolean(USE_ENTITY, false);
+ }
+
+ public static boolean getUsePCJ(final Configuration conf) {
+ return conf.getBoolean(USE_PCJ, false);
+ }
+
+ public static boolean getUseOptimalPCJ(final Configuration conf) {
+ return conf.getBoolean(USE_OPTIMAL_PCJ, false);
+ }
+
+ public static boolean getUsePcjUpdaterIndex(final Configuration conf) {
+ return conf.getBoolean(USE_PCJ_UPDATER_INDEX, false);
+ }
+
+
+ /**
+ * @return The name of the Fluo Application this instance of RYA is using to
+ * incrementally update PCJs.
+ */
+ // TODO delete this eventually and use Details table
+ public Optional<String> getFluoAppName(final Configuration conf) {
+ return Optional.fromNullable(conf.get(FLUO_APP_NAME));
+ }
+
+
+ public static boolean getUseMongo(final Configuration conf) {
+ return conf.getBoolean(USE_MONGO, false);
+ }
+
+
+ public static void setIndexers(final RdfCloudTripleStoreConfiguration conf) {
+
+ final List<String> indexList = Lists.newArrayList();
+ final List<String> optimizers = Lists.newArrayList();
+
+ boolean useFilterIndex = false;
+
+ if (ConfigUtils.getUseMongo(conf)) {
+ if (getUseFreeText(conf)) {
+ indexList.add(MongoFreeTextIndexer.class.getName());
+ useFilterIndex = true;
+ }
+
+ if (getUseEntity(conf)) {
+ indexList.add(MongoEntityIndexer.class.getName());
+ optimizers.add(EntityIndexOptimizer.class.getName());
+ }
+
+ if (getUseTemporal(conf)) {
+ indexList.add(MongoTemporalIndexer.class.getName());
+ useFilterIndex = true;
+ }
+ } else {
+ if (getUsePCJ(conf) || getUseOptimalPCJ(conf)) {
+ conf.setPcjOptimizer(PCJOptimizer.class);
+ }
+
+ if (getUsePcjUpdaterIndex(conf)) {
+ indexList.add(PrecomputedJoinIndexer.class.getName());
+ }
+
+ if (getUseFreeText(conf)) {
+ indexList.add(AccumuloFreeTextIndexer.class.getName());
+ useFilterIndex = true;
+ }
+
+ if (getUseTemporal(conf)) {
+ indexList.add(AccumuloTemporalIndexer.class.getName());
+ useFilterIndex = true;
+ }
+
+ if (getUseEntity(conf)) {
+ indexList.add(EntityCentricIndex.class.getName());
+ optimizers.add(EntityOptimizer.class.getName());
+ }
+ }
+
+ if (useFilterIndex) {
+ optimizers.add(FilterFunctionOptimizer.class.getName());
+ }
+
+ if (conf.getUseStatementMetadata()) {
+ optimizers.add(StatementMetadataOptimizer.class.getName());
+ }
+
+<<<<<<< HEAD
+ conf.setStrings(AccumuloRdfConfiguration.CONF_ADDITIONAL_INDEXERS, indexList.toArray(new String[] {}));
+ conf.setStrings(RdfCloudTripleStoreConfiguration.CONF_OPTIMIZERS, optimizers.toArray(new String[] {}));
+ }
+}
+=======
+ final String[] existingIndexers = conf.getStrings(AccumuloRdfConfiguration.CONF_ADDITIONAL_INDEXERS);
+ if(existingIndexers != null ) {
+ for(final String idx : existingIndexers) {
+ indexList.add(idx);
+ }
+ }
+
+ final String[] existingOptimizers = conf.getStrings(RdfCloudTripleStoreConfiguration.CONF_OPTIMIZERS);
+ if(existingOptimizers != null ) {
+ for(final String opt : existingOptimizers) {
+ optimizers.add(opt);
+ }
+ }
+
+ conf.setStrings(AccumuloRdfConfiguration.CONF_ADDITIONAL_INDEXERS, indexList.toArray(new String[]{}));
+ conf.setStrings(RdfCloudTripleStoreConfiguration.CONF_OPTIMIZERS, optimizers.toArray(new String[]{}));
+ }
+
+
+
+}
+>>>>>>> RYA-236 Changes to other indexers
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/646d21b4/extras/indexing/src/main/java/org/apache/rya/indexing/entity/update/EntityUpdater.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/org/apache/rya/indexing/entity/update/EntityUpdater.java b/extras/indexing/src/main/java/org/apache/rya/indexing/entity/update/EntityUpdater.java
index 2edbe37..91f1146 100644
--- a/extras/indexing/src/main/java/org/apache/rya/indexing/entity/update/EntityUpdater.java
+++ b/extras/indexing/src/main/java/org/apache/rya/indexing/entity/update/EntityUpdater.java
@@ -26,7 +26,7 @@ import org.apache.rya.api.domain.RyaURI;
import org.apache.rya.indexing.entity.model.Entity;
import org.apache.rya.indexing.entity.storage.EntityStorage;
import org.apache.rya.indexing.entity.storage.EntityStorage.EntityStorageException;
-import org.apache.rya.indexing.mongodb.update.DocumentUpdater;
+import org.apache.rya.indexing.mongodb.update.MongoDocumentUpdater;
import org.apache.rya.indexing.mongodb.update.RyaObjectStorage.ObjectStorageException;
import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
@@ -36,7 +36,7 @@ import edu.umd.cs.findbugs.annotations.NonNull;
* Performs update operations over an {@link EntityStorage}.
*/
@DefaultAnnotation(NonNull.class)
-public class EntityUpdater implements DocumentUpdater<RyaURI, Entity>{
+public class EntityUpdater implements MongoDocumentUpdater<RyaURI, Entity>{
private final EntityStorage storage;
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/646d21b4/extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/MongoDbSmartUri.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/MongoDbSmartUri.java b/extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/MongoDbSmartUri.java
index 9fdfad6..cbc8796 100644
--- a/extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/MongoDbSmartUri.java
+++ b/extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/MongoDbSmartUri.java
@@ -36,6 +36,7 @@ import org.apache.rya.indexing.entity.storage.EntityStorage;
import org.apache.rya.indexing.entity.storage.EntityStorage.EntityStorageException;
import org.apache.rya.indexing.entity.storage.mongo.ConvertingCursor;
import org.apache.rya.indexing.entity.storage.mongo.MongoEntityStorage;
+import org.apache.rya.indexing.mongodb.update.RyaObjectStorage.ObjectStorageException;
import org.apache.rya.indexing.smarturi.SmartUriAdapter;
import org.apache.rya.indexing.smarturi.SmartUriException;
import org.apache.rya.indexing.smarturi.SmartUriStorage;
@@ -74,7 +75,7 @@ public class MongoDbSmartUri implements SmartUriStorage {
// Create it.
try {
entityStorage.create(entity);
- } catch (final EntityStorageException e) {
+ } catch (final ObjectStorageException e) {
throw new SmartUriException("Failed to create entity storage", e);
}
}
@@ -86,7 +87,7 @@ public class MongoDbSmartUri implements SmartUriStorage {
// Create it.
try {
entityStorage.create(entity);
- } catch (final EntityStorageException e) {
+ } catch (final ObjectStorageException e) {
throw new SmartUriException("Failed to create entity storage", e);
}
}
@@ -98,7 +99,7 @@ public class MongoDbSmartUri implements SmartUriStorage {
// Update it.
try {
entityStorage.update(oldEntity, updatedEntity);
- } catch (final EntityStorageException e) {
+ } catch (final ObjectStorageException e) {
throw new SmartUriException("Failed to update entity", e);
}
}
@@ -111,7 +112,7 @@ public class MongoDbSmartUri implements SmartUriStorage {
try {
final Optional<Entity> resultEntity = entityStorage.get(subject);
return resultEntity.get();
- } catch (final EntityStorageException e) {
+ } catch (final ObjectStorageException e) {
throw new SmartUriException("Failed to query entity storage", e);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/646d21b4/extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/update/DocumentUpdater.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/update/DocumentUpdater.java b/extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/update/DocumentUpdater.java
deleted file mode 100644
index 0b9db13..0000000
--- a/extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/update/DocumentUpdater.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.rya.indexing.mongodb.update;
-
-import static java.util.Objects.requireNonNull;
-
-import java.util.Optional;
-import java.util.function.Function;
-
-import org.apache.rya.indexing.mongodb.IndexingException;
-
-import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
-import edu.umd.cs.findbugs.annotations.NonNull;
-
-/**
- * Performs an update operation on a Document in mongodb.
- * @param <T> - The key to find the object.
- * @param <V> - The type of object to get updated.
- */
-@DefaultAnnotation(NonNull.class)
-public interface DocumentUpdater<T, V> {
- public default void update(final T key, final DocumentMutator<V> mutator) throws IndexingException {
- requireNonNull(mutator);
-
- // Fetch the current state of the Entity.
- boolean completed = false;
- while(!completed) {
- //this cast is safe since the mutator interface is defined below to use Optional<V>
- final Optional<V> old = getOld(key);
- final Optional<V> updated = mutator.apply(old);
-
- final boolean doWork = updated.isPresent();
- if(doWork) {
- if(!old.isPresent()) {
- create(updated.get());
- } else {
- update(old.get(), updated.get());
- }
- }
- completed = true;
- }
- }
-
- Optional<V> getOld(T key) throws IndexingException;
-
- void create(final V newObj) throws IndexingException;
-
- void update(final V old, final V updated) throws IndexingException;
-
- /**
- * Implementations of this interface are used to update the state of a
- * {@link DocumentUpdater#V} in unison with a {@link DocumentUpdater}.
- * </p>
- * This table describes what the updater will do depending on if the object
- * exists and if an updated object is returned.
- * </p>
- * <table border="1px">
- * <tr><th>Object Provided</th><th>Update Returned</th><th>Effect</th></tr>
- * <tr>
- * <td>true</td>
- * <td>true</td>
- * <td>The old Object will be updated using the returned state.</td>
- * </tr>
- * <tr>
- * <td>true</td>
- * <td>false</td>
- * <td>No work is performed.</td>
- * </tr>
- * <tr>
- * <td>false</td>
- * <td>true</td>
- * <td>A new Object will be created using the returned state.</td>
- * </tr>
- * <tr>
- * <td>false</td>
- * <td>false</td>
- * <td>No work is performed.</td>
- * </tr>
- * </table>
- */
- public interface DocumentMutator<V> extends Function<Optional<V>, Optional<V>> { }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/646d21b4/extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/update/MongoDocumentUpdater.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/update/MongoDocumentUpdater.java b/extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/update/MongoDocumentUpdater.java
new file mode 100644
index 0000000..a7a3eb9
--- /dev/null
+++ b/extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/update/MongoDocumentUpdater.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.mongodb.update;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Optional;
+import java.util.function.Function;
+
+import org.apache.rya.indexing.mongodb.IndexingException;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * Performs an update operation on a Document in mongodb.
+ * @param <T> - The key to find the object.
+ * @param <V> - The type of object to get updated.
+ */
+@DefaultAnnotation(NonNull.class)
+public interface MongoDocumentUpdater<T, V> {
+ public default void update(final T key, final DocumentMutator<V> mutator) throws IndexingException {
+ requireNonNull(mutator);
+
+ // Fetch the current state of the Entity.
+ boolean completed = false;
+ while(!completed) {
+ //this cast is safe since the mutator interface is defined below to use Optional<V>
+ final Optional<V> old = getOld(key);
+ final Optional<V> updated = mutator.apply(old);
+
+ final boolean doWork = updated.isPresent();
+ if(doWork) {
+ if(!old.isPresent()) {
+ create(updated.get());
+ } else {
+ update(old.get(), updated.get());
+ }
+ }
+ completed = true;
+ }
+ }
+
+ Optional<V> getOld(T key) throws IndexingException;
+
+ void create(final V newObj) throws IndexingException;
+
+ void update(final V old, final V updated) throws IndexingException;
+
+ /**
+ * Implementations of this interface are used to update the state of a
+ * {@link MongoDocumentUpdater#V} in unison with a {@link MongoDocumentUpdater}.
+ * </p>
+ * This table describes what the updater will do depending on if the object
+ * exists and if an updated object is returned.
+ * </p>
+ * <table border="1px">
+ * <tr><th>Object Provided</th><th>Update Returned</th><th>Effect</th></tr>
+ * <tr>
+ * <td>true</td>
+ * <td>true</td>
+ * <td>The old Object will be updated using the returned state.</td>
+ * </tr>
+ * <tr>
+ * <td>true</td>
+ * <td>false</td>
+ * <td>No work is performed.</td>
+ * </tr>
+ * <tr>
+ * <td>false</td>
+ * <td>true</td>
+ * <td>A new Object will be created using the returned state.</td>
+ * </tr>
+ * <tr>
+ * <td>false</td>
+ * <td>false</td>
+ * <td>No work is performed.</td>
+ * </tr>
+ * </table>
+ */
+ public interface DocumentMutator<V> extends Function<Optional<V>, Optional<V>> { }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/646d21b4/extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/update/RyaObjectStorage.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/update/RyaObjectStorage.java b/extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/update/RyaObjectStorage.java
index 10feb0d..bd04368 100644
--- a/extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/update/RyaObjectStorage.java
+++ b/extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/update/RyaObjectStorage.java
@@ -25,6 +25,7 @@ import org.apache.rya.indexing.mongodb.IndexingException;
/**
* Stores and provides access to objects of type T.
+ * The RyaURI subject is the primary storage key used.
* @param <T> - The type of object to store/access.
*/
public interface RyaObjectStorage<T> {
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/646d21b4/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/GeoEnabledFilterFunctionOptimizer.java
----------------------------------------------------------------------
diff --git a/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/GeoEnabledFilterFunctionOptimizer.java b/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/GeoEnabledFilterFunctionOptimizer.java
index d773831..bf6b632 100644
--- a/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/GeoEnabledFilterFunctionOptimizer.java
+++ b/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/GeoEnabledFilterFunctionOptimizer.java
@@ -34,6 +34,18 @@ import org.apache.commons.lang.Validate;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.log4j.Logger;
+import org.apache.rya.accumulo.AccumuloRdfConfiguration;
+import org.apache.rya.indexing.IndexingFunctionRegistry.FUNCTION_TYPE;
+import org.apache.rya.indexing.accumulo.ConfigUtils;
+import org.apache.rya.indexing.accumulo.freetext.AccumuloFreeTextIndexer;
+import org.apache.rya.indexing.accumulo.freetext.FreeTextTupleSet;
+import org.apache.rya.indexing.accumulo.geo.GeoMesaGeoIndexer;
+import org.apache.rya.indexing.accumulo.geo.GeoParseUtils;
+import org.apache.rya.indexing.accumulo.geo.GeoTupleSet;
+import org.apache.rya.indexing.accumulo.temporal.AccumuloTemporalIndexer;
+import org.apache.rya.indexing.mongodb.freetext.MongoFreeTextIndexer;
+import org.apache.rya.indexing.mongodb.geo.MongoGeoIndexer;
+import org.apache.rya.indexing.mongodb.temporal.MongoTemporalIndexer;
import org.geotools.feature.SchemaException;
import org.openrdf.model.Resource;
import org.openrdf.model.URI;
@@ -52,25 +64,12 @@ import org.openrdf.query.algebra.QueryModelNode;
import org.openrdf.query.algebra.StatementPattern;
import org.openrdf.query.algebra.TupleExpr;
import org.openrdf.query.algebra.ValueConstant;
-import org.openrdf.query.algebra.ValueExpr;
import org.openrdf.query.algebra.Var;
import org.openrdf.query.algebra.evaluation.QueryOptimizer;
import org.openrdf.query.algebra.helpers.QueryModelVisitorBase;
import com.google.common.collect.Lists;
-import org.apache.rya.accumulo.AccumuloRdfConfiguration;
-import org.apache.rya.indexing.IndexingFunctionRegistry.FUNCTION_TYPE;
-import org.apache.rya.indexing.accumulo.ConfigUtils;
-import org.apache.rya.indexing.accumulo.freetext.AccumuloFreeTextIndexer;
-import org.apache.rya.indexing.accumulo.freetext.FreeTextTupleSet;
-import org.apache.rya.indexing.accumulo.geo.GeoMesaGeoIndexer;
-import org.apache.rya.indexing.accumulo.geo.GeoTupleSet;
-import org.apache.rya.indexing.accumulo.temporal.AccumuloTemporalIndexer;
-import org.apache.rya.indexing.mongodb.freetext.MongoFreeTextIndexer;
-import org.apache.rya.indexing.mongodb.geo.MongoGeoIndexer;
-import org.apache.rya.indexing.mongodb.temporal.MongoTemporalIndexer;
-
public class GeoEnabledFilterFunctionOptimizer implements QueryOptimizer, Configurable {
private static final Logger LOG = Logger.getLogger(GeoEnabledFilterFunctionOptimizer.class);
private final ValueFactory valueFactory = new ValueFactoryImpl();
@@ -232,7 +231,7 @@ public class GeoEnabledFilterFunctionOptimizer implements QueryOptimizer, Config
final URI fnUri = valueFactory.createURI(call.getURI());
final Var resultVar = IndexingFunctionRegistry.getResultVarFromFunctionCall(fnUri, call.getArgs());
if (resultVar != null && resultVar.getName().equals(matchVar)) {
- addFilter(valueFactory.createURI(call.getURI()), extractArguments(matchVar, call));
+ addFilter(valueFactory.createURI(call.getURI()), GeoParseUtils.extractArguments(matchVar, call));
if (call.getParentNode() instanceof Filter || call.getParentNode() instanceof And || call.getParentNode() instanceof LeftJoin) {
call.replaceWith(new ValueConstant(valueFactory.createLiteral(true)));
} else {
@@ -241,26 +240,6 @@ public class GeoEnabledFilterFunctionOptimizer implements QueryOptimizer, Config
}
}
- private Value[] extractArguments(final String matchName, final FunctionCall call) {
- final Value args[] = new Value[call.getArgs().size() - 1];
- int argI = 0;
- for (int i = 0; i != call.getArgs().size(); ++i) {
- final ValueExpr arg = call.getArgs().get(i);
- if (argI == i && arg instanceof Var && matchName.equals(((Var)arg).getName())) {
- continue;
- }
- if (arg instanceof ValueConstant) {
- args[argI] = ((ValueConstant)arg).getValue();
- } else if (arg instanceof Var && ((Var)arg).hasValue()) {
- args[argI] = ((Var)arg).getValue();
- } else {
- throw new IllegalArgumentException("Query error: Found " + arg + ", expected a Literal, BNode or URI");
- }
- ++argI;
- }
- return args;
- }
-
@Override
public void meet(final Filter filter) {
//First visit children, then condition (reverse of default):
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/646d21b4/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/accumulo/geo/GeoParseUtils.java
----------------------------------------------------------------------
diff --git a/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/accumulo/geo/GeoParseUtils.java b/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/accumulo/geo/GeoParseUtils.java
index ffba225..e8fbc3d 100644
--- a/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/accumulo/geo/GeoParseUtils.java
+++ b/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/accumulo/geo/GeoParseUtils.java
@@ -14,9 +14,9 @@ import javax.xml.parsers.ParserConfigurationException;
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -27,21 +27,25 @@ import javax.xml.parsers.ParserConfigurationException;
import org.apache.log4j.Logger;
+import org.apache.rya.indexing.GeoConstants;
import org.geotools.gml3.GMLConfiguration;
import org.geotools.xml.Parser;
import org.openrdf.model.Literal;
import org.openrdf.model.Statement;
+import org.openrdf.model.Value;
+import org.openrdf.query.algebra.FunctionCall;
+import org.openrdf.query.algebra.ValueConstant;
+import org.openrdf.query.algebra.ValueExpr;
+import org.openrdf.query.algebra.Var;
import org.xml.sax.SAXException;
import com.vividsolutions.jts.geom.Geometry;
import com.vividsolutions.jts.io.ParseException;
import com.vividsolutions.jts.io.WKTReader;
-import org.apache.rya.indexing.GeoConstants;
-
public class GeoParseUtils {
static final Logger logger = Logger.getLogger(GeoParseUtils.class);
- /**
+ /**
* @deprecated Not needed since geo literals may be WKT or GML.
*
* This method warns on a condition that must already be tested. Replaced by
@@ -50,41 +54,41 @@ public class GeoParseUtils {
* and getLiteral(statement).getDatatype()
*/
@Deprecated
- public static String getWellKnownText(Statement statement) throws ParseException {
- Literal lit = getLiteral(statement);
+ public static String getWellKnownText(final Statement statement) throws ParseException {
+ final Literal lit = getLiteral(statement);
if (!GeoConstants.XMLSCHEMA_OGC_WKT.equals(lit.getDatatype())) {
logger.warn("Literal is not of type " + GeoConstants.XMLSCHEMA_OGC_WKT + ": " + statement.toString());
}
return lit.getLabel().toString();
}
- public static Literal getLiteral(Statement statement) throws ParseException {
- org.openrdf.model.Value v = statement.getObject();
+ public static Literal getLiteral(final Statement statement) throws ParseException {
+ final org.openrdf.model.Value v = statement.getObject();
if (!(v instanceof Literal)) {
throw new ParseException("Statement does not contain Literal: " + statement.toString());
}
- Literal lit = (Literal) v;
+ final Literal lit = (Literal) v;
return lit;
}
/**
* Parse GML/wkt literal to Geometry
- *
+ *
* @param statement
* @return
* @throws ParseException
- * @throws ParserConfigurationException
- * @throws SAXException
- * @throws IOException
+ * @throws ParserConfigurationException
+ * @throws SAXException
+ * @throws IOException
*/
- public static Geometry getGeometry(Statement statement) throws ParseException {
+ public static Geometry getGeometry(final Statement statement) throws ParseException {
// handle GML or WKT
- Literal lit = getLiteral(statement);
+ final Literal lit = getLiteral(statement);
if (GeoConstants.XMLSCHEMA_OGC_WKT.equals(lit.getDatatype())) {
final String wkt = lit.getLabel().toString();
return (new WKTReader()).read(wkt);
} else if (GeoConstants.XMLSCHEMA_OGC_GML.equals(lit.getDatatype())) {
- String gml = lit.getLabel().toString();
+ final String gml = lit.getLabel().toString();
try {
return getGeometryGml(gml);
} catch (IOException | SAXException | ParserConfigurationException e) {
@@ -102,18 +106,43 @@ public class GeoParseUtils {
* @throws SAXException
* @throws ParserConfigurationException
*/
- public static Geometry getGeometryGml(String gmlString) throws IOException, SAXException, ParserConfigurationException {
- Reader reader = new StringReader(gmlString);
- GMLConfiguration gmlConfiguration = new GMLConfiguration();
- Parser gmlParser = new Parser(gmlConfiguration);
+ public static Geometry getGeometryGml(final String gmlString) throws IOException, SAXException, ParserConfigurationException {
+ final Reader reader = new StringReader(gmlString);
+ final GMLConfiguration gmlConfiguration = new GMLConfiguration();
+ final Parser gmlParser = new Parser(gmlConfiguration);
// gmlParser.setStrict(false); // attempt at allowing deprecated elements, but no.
// gmlParser.setValidating(false);
final Geometry geometry = (Geometry) gmlParser.parse(reader);
// This sometimes gets populated with the SRS/CRS: geometry.getUserData()
- // Always returns 0 : geometry.getSRID()
+ // Always returns 0 : geometry.getSRID()
//TODO geometry.setUserData(some default CRS); OR geometry.setSRID(some default CRS)
-
+
return geometry;
}
+ /**
+ * Extracts the arguments used in a {@link FunctionCall}.
+ * @param matchName - The variable name to match to arguments used in the {@link FunctionCall}.
+ * @param call - The {@link FunctionCall} to match against.
+ * @return - The {@link Value}s matched.
+ */
+ public static Value[] extractArguments(final String matchName, final FunctionCall call) {
+ final Value args[] = new Value[call.getArgs().size() - 1];
+ int argI = 0;
+ for (int i = 0; i != call.getArgs().size(); ++i) {
+ final ValueExpr arg = call.getArgs().get(i);
+ if (argI == i && arg instanceof Var && matchName.equals(((Var)arg).getName())) {
+ continue;
+ }
+ if (arg instanceof ValueConstant) {
+ args[argI] = ((ValueConstant)arg).getValue();
+ } else if (arg instanceof Var && ((Var)arg).hasValue()) {
+ args[argI] = ((Var)arg).getValue();
+ } else {
+ throw new IllegalArgumentException("Query error: Found " + arg + ", expected a Literal, BNode or URI");
+ }
+ ++argI;
+ }
+ return args;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/646d21b4/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/GeoTemporalIndexSetProvider.java
----------------------------------------------------------------------
diff --git a/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/GeoTemporalIndexSetProvider.java b/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/GeoTemporalIndexSetProvider.java
index 38790c4..bf12f26 100644
--- a/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/GeoTemporalIndexSetProvider.java
+++ b/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/GeoTemporalIndexSetProvider.java
@@ -28,22 +28,22 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
+import org.apache.log4j.Logger;
import org.apache.rya.indexing.IndexingExpr;
import org.apache.rya.indexing.IndexingFunctionRegistry;
import org.apache.rya.indexing.IndexingFunctionRegistry.FUNCTION_TYPE;
+import org.apache.rya.indexing.accumulo.geo.GeoParseUtils;
import org.apache.rya.indexing.accumulo.geo.GeoTupleSet;
import org.apache.rya.indexing.external.matching.ExternalSetProvider;
import org.apache.rya.indexing.external.matching.QuerySegment;
import org.apache.rya.indexing.geotemporal.model.EventQueryNode;
+import org.apache.rya.indexing.geotemporal.model.EventQueryNode.EventQueryNodeBuilder;
import org.apache.rya.indexing.geotemporal.storage.EventStorage;
import org.openrdf.model.URI;
-import org.openrdf.model.Value;
import org.openrdf.model.impl.URIImpl;
import org.openrdf.query.algebra.FunctionCall;
import org.openrdf.query.algebra.QueryModelNode;
import org.openrdf.query.algebra.StatementPattern;
-import org.openrdf.query.algebra.ValueConstant;
-import org.openrdf.query.algebra.ValueExpr;
import org.openrdf.query.algebra.Var;
import org.openrdf.query.algebra.helpers.QueryModelVisitorBase;
@@ -54,6 +54,8 @@ import com.google.common.collect.Multimap;
* Provides {@link GeoTupleSet}s.
*/
public class GeoTemporalIndexSetProvider implements ExternalSetProvider<EventQueryNode> {
+ private static final Logger LOG = Logger.getLogger(GeoTemporalIndexSetProvider.class);
+
//organzied by object var. Each object is a filter, or set of filters
private Multimap<Var, IndexingExpr> filterMap;
@@ -138,13 +140,20 @@ public class GeoTemporalIndexSetProvider implements ExternalSetProvider<EventQue
}
if(geoFilters.isPresent() && temporalFilters.isPresent() && geoPattern.isPresent() && temporalPattern.isPresent()) {
- return new EventQueryNode(eventStorage, geoPattern.get(), temporalPattern.get(), geoFilters.get(), temporalFilters.get(), usedFilters);
+ return new EventQueryNodeBuilder()
+ .setStorage(eventStorage)
+ .setGeoPattern(geoPattern.get())
+ .setTemporalPattern(temporalPattern.get())
+ .setGeoFilters(geoFilters.get())
+ .setTemporalFilters(temporalFilters.get())
+ .setUsedFilters(usedFilters)
+ .build();
} else {
return null;
}
}
- private FUNCTION_TYPE ensureSameType(final Collection<IndexingExpr> filters) {
+ private static FUNCTION_TYPE ensureSameType(final Collection<IndexingExpr> filters) {
FUNCTION_TYPE type = null;
for(final IndexingExpr filter : filters) {
if(type == null) {
@@ -174,7 +183,7 @@ public class GeoTemporalIndexSetProvider implements ExternalSetProvider<EventQue
try {
filter.visit(new FilterVisitor());
} catch (final Exception e) {
- e.printStackTrace();
+ LOG.error("Failed to match the filter object.", e);
}
}
@@ -204,27 +213,7 @@ public class GeoTemporalIndexSetProvider implements ExternalSetProvider<EventQue
private void addFilter(final FunctionCall call) {
filterURI = new URIImpl(call.getURI());
final Var objVar = IndexingFunctionRegistry.getResultVarFromFunctionCall(filterURI, call.getArgs());
- filterMap.put(objVar, new IndexingExpr(filterURI, objectPatterns.get(objVar), extractArguments(objVar.getName(), call)));
- }
-
- private Value[] extractArguments(final String matchName, final FunctionCall call) {
- final Value args[] = new Value[call.getArgs().size() - 1];
- int argI = 0;
- for (int i = 0; i != call.getArgs().size(); ++i) {
- final ValueExpr arg = call.getArgs().get(i);
- if (argI == i && arg instanceof Var && matchName.equals(((Var)arg).getName())) {
- continue;
- }
- if (arg instanceof ValueConstant) {
- args[argI] = ((ValueConstant)arg).getValue();
- } else if (arg instanceof Var && ((Var)arg).hasValue()) {
- args[argI] = ((Var)arg).getValue();
- } else {
- throw new IllegalArgumentException("Query error: Found " + arg + ", expected a Literal, BNode or URI");
- }
- ++argI;
- }
- return args;
+ filterMap.put(objVar, new IndexingExpr(filterURI, objectPatterns.get(objVar), GeoParseUtils.extractArguments(objVar.getName(), call)));
}
/**
@@ -234,13 +223,12 @@ public class GeoTemporalIndexSetProvider implements ExternalSetProvider<EventQue
private class FilterVisitor extends QueryModelVisitorBase<Exception> {
@Override
public void meet(final FunctionCall call) throws Exception {
-
filterURI = new URIImpl(call.getURI());
final FUNCTION_TYPE type = IndexingFunctionRegistry.getFunctionType(filterURI);
if(type == FUNCTION_TYPE.GEO || type == FUNCTION_TYPE.TEMPORAL) {
final Var objVar = IndexingFunctionRegistry.getResultVarFromFunctionCall(filterURI, call.getArgs());
if(objectPatterns.containsKey(objVar)) {
- filterMap.put(objVar, new IndexingExpr(filterURI, objectPatterns.get(objVar), extractArguments(objVar.getName(), call)));
+ filterMap.put(objVar, new IndexingExpr(filterURI, objectPatterns.get(objVar), GeoParseUtils.extractArguments(objVar.getName(), call)));
matchedFilters.put(objVar, call);
} else {
unmatchedFilters.put(objVar, call);
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/646d21b4/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/GeoTemporalIndexer.java
----------------------------------------------------------------------
diff --git a/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/GeoTemporalIndexer.java b/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/GeoTemporalIndexer.java
index 01b254b..cbc978b 100644
--- a/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/GeoTemporalIndexer.java
+++ b/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/GeoTemporalIndexer.java
@@ -39,14 +39,50 @@ public interface GeoTemporalIndexer extends RyaSecondaryIndexer {
*/
public abstract EventStorage getEventStorage(final Configuration conf);
- public enum GeoPolicy {
+ /**
+ * Used to indicate which geo filter functions to use in a query.
+ */
+ public static enum GeoPolicy {
+ /**
+ * The provided geo object equals the geo object where the event took place.
+ */
EQUALS(GeoConstants.GEO_SF_EQUALS),
+
+ /**
+ * The provided geo object does not share any space with the event.
+ */
DISJOINT(GeoConstants.GEO_SF_DISJOINT),
+
+ /**
+ * The provided geo object shares some amount of space with the event.
+ */
INTERSECTS(GeoConstants.GEO_SF_INTERSECTS),
+
+ /**
+ * The provided geo object shares a point with the event, but only on the edge.
+ */
TOUCHES(GeoConstants.GEO_SF_TOUCHES),
+
+ /**
+ * The provided geo object shares some, but not all space with the event.
+ */
CROSSES(GeoConstants.GEO_SF_CROSSES),
+
+ /**
+ * The provided geo object exists completely within the event.
+ */
WITHIN(GeoConstants.GEO_SF_WITHIN),
+
+ /**
+ * The event took place completely within the provided geo object.
+ */
CONTAINS(GeoConstants.GEO_SF_CONTAINS),
+
+ /**
+ * The provided geo object has some but not all points in common with the event,
+ * are of the same dimension, and the intersection of the interiors has the
+ * same dimension as the geometries themselves.
+ */
OVERLAPS(GeoConstants.GEO_SF_OVERLAPS);
private final URI uri;
@@ -69,10 +105,9 @@ public interface GeoTemporalIndexer extends RyaSecondaryIndexer {
}
}
- String TEMPORAL_NS = "tag:rya-rdf.org,2015:temporal#";
+ static final String TEMPORAL_NS = "tag:rya-rdf.org,2015:temporal#";
/**
- * All of the filter functions that can be used in a temporal based query.
- * <p>
+ * Used to indicate which temporal filter functions to use in a query.
*/
public enum TemporalPolicy {
/**
@@ -106,12 +141,28 @@ public interface GeoTemporalIndexer extends RyaSecondaryIndexer {
INSTANT_AFTER_INTERVAL(false, new URIImpl(TEMPORAL_NS+"afterInterval")),
/**
- * The provided instant in time equals the instant the event took place.
+ * The provided instant in time equals the start of the interval in which the event took place.
*/
INSTANT_START_INTERVAL(false, new URIImpl(TEMPORAL_NS+"hasBeginningInterval")),
+
+ /**
+ * The provided instant in time equals the end of the interval in which the event took place.
+ */
INSTANT_END_INTERVAL(false, new URIImpl(TEMPORAL_NS+"hasEndInterval")),
+
+ /**
+ * The provided interval equals the interval in which the event took place.
+ */
INTERVAL_EQUALS(false, new URIImpl(TEMPORAL_NS+"intervalEquals")),
+
+ /**
+ * The provided interval is before the interval in which the event took place.
+ */
INTERVAL_BEFORE(false, new URIImpl(TEMPORAL_NS+"intervalBefore")),
+
+ /**
+ * The provided interval is after the interval in which the event took place.
+ */
INTERVAL_AFTER(false, new URIImpl(TEMPORAL_NS+"intervalAfter"));
private final boolean isInstant;
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/646d21b4/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/model/EventQueryNode.java
----------------------------------------------------------------------
diff --git a/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/model/EventQueryNode.java b/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/model/EventQueryNode.java
index 6953714..104fca8 100644
--- a/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/model/EventQueryNode.java
+++ b/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/model/EventQueryNode.java
@@ -29,12 +29,17 @@ import java.util.Objects;
import java.util.Optional;
import java.util.Set;
+import org.apache.commons.lang.builder.EqualsBuilder;
import org.apache.rya.api.domain.RyaURI;
import org.apache.rya.indexing.IndexingExpr;
+import org.apache.rya.indexing.TemporalInstant;
import org.apache.rya.indexing.TemporalInstantRfc3339;
+import org.apache.rya.indexing.entity.query.EntityQueryNode;
import org.apache.rya.indexing.geotemporal.storage.EventStorage;
import org.apache.rya.indexing.mongodb.update.RyaObjectStorage.ObjectStorageException;
import org.apache.rya.rdftriplestore.evaluation.ExternalBatchingIterator;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
import org.openrdf.model.Value;
import org.openrdf.model.impl.ValueFactoryImpl;
import org.openrdf.query.BindingSet;
@@ -60,8 +65,10 @@ public class EventQueryNode extends ExternalSet implements ExternalBatchingItera
//Information about the subject of the patterns.
private final boolean subjectIsConstant;
- private final Optional<String> subjectConstant;
private final Optional<String> subjectVar;
+ //not final because if the subject is a variable and the evaluate() is
+ // provided a binding set that contains the subject, this optional is used.
+ private Optional<String> subjectConstant;
//since and EventQueryNode exists in a single segment, all binding names are garunteed to be assured.
private final Set<String> bindingNames;
@@ -80,7 +87,7 @@ public class EventQueryNode extends ExternalSet implements ExternalBatchingItera
* @param entities - The {@link EventStorage} that will be searched to match
* {@link BindingSet}s when evaluating a query. (not null)
*/
- public EventQueryNode(final EventStorage eventStore, final StatementPattern geoPattern, final StatementPattern temporalPattern, final Collection<IndexingExpr> geoFilters, final Collection<IndexingExpr> temporalFilters, final Collection<FunctionCall> usedFilters) throws IllegalStateException {
+ private EventQueryNode(final EventStorage eventStore, final StatementPattern geoPattern, final StatementPattern temporalPattern, final Collection<IndexingExpr> geoFilters, final Collection<IndexingExpr> temporalFilters, final Collection<FunctionCall> usedFilters) throws IllegalStateException {
this.geoPattern = requireNonNull(geoPattern);
this.temporalPattern = requireNonNull(temporalPattern);
this.geoFilters = requireNonNull(geoFilters);
@@ -159,8 +166,15 @@ public class EventQueryNode extends ExternalSet implements ExternalBatchingItera
try {
final Collection<Event> searchEvents;
final String subj;
+ //if the provided binding set has the subject already, set it to the constant subject.
+ if(!subjectConstant.isPresent() && bindings.hasBinding(subjectVar.get())) {
+ subjectConstant = Optional.of(bindings.getValue(subjectVar.get()).stringValue());
+ } else if(bindings.size() != 0) {
+ list.add(bindings);
+ }
+
// If the subject needs to be filled in, check if the subject variable is in the binding set.
- if(subjectIsConstant) {
+ if(subjectConstant.isPresent()) {
// if it is, fetch that value and then fetch the entity for the subject.
subj = subjectConstant.get();
searchEvents = eventStore.search(Optional.of(new RyaURI(subj)), Optional.of(geoFilters), Optional.of(temporalFilters));
@@ -179,7 +193,11 @@ public class EventQueryNode extends ExternalSet implements ExternalBatchingItera
final Value temporalValue;
if(event.isInstant() && event.getInstant().isPresent()) {
- temporalValue = ValueFactoryImpl.getInstance().createLiteral(event.getInstant().get().getAsDateTime().toString(TemporalInstantRfc3339.FORMATTER));
+ final Optional<TemporalInstant> opt = event.getInstant();
+ DateTime dt = opt.get().getAsDateTime();
+ dt = dt.toDateTime(DateTimeZone.UTC);
+ final String str = dt.toString(TemporalInstantRfc3339.FORMATTER);
+ temporalValue = ValueFactoryImpl.getInstance().createLiteral(str);
} else if(event.getInterval().isPresent()) {
temporalValue = ValueFactoryImpl.getInstance().createLiteral(event.getInterval().get().getAsPair());
} else {
@@ -195,9 +213,6 @@ public class EventQueryNode extends ExternalSet implements ExternalBatchingItera
} catch (final ObjectStorageException e) {
throw new QueryEvaluationException("Failed to evaluate the binding set", e);
}
- if(bindings.size() != 0) {
- list.add(bindings);
- }
return new CollectionIteration<>(list);
}
@@ -238,15 +253,16 @@ public class EventQueryNode extends ExternalSet implements ExternalBatchingItera
public boolean equals(final Object other) {
if(other instanceof EventQueryNode) {
final EventQueryNode otherNode = (EventQueryNode)other;
-
- return Objects.equals(subjectIsConstant, otherNode.subjectIsConstant) &&
- Objects.equals(subjectVar, otherNode.subjectVar) &&
- Objects.equals(geoFilters, otherNode.geoFilters) &&
- Objects.equals(geoPattern, otherNode.geoPattern) &&
- Objects.equals(temporalFilters, otherNode.temporalFilters) &&
- Objects.equals(temporalPattern, otherNode.temporalPattern) &&
- Objects.equals(bindingNames, otherNode.bindingNames) &&
- Objects.equals(subjectConstant, otherNode.subjectConstant);
+ return new EqualsBuilder()
+ .append(subjectIsConstant, otherNode.subjectIsConstant)
+ .append(subjectVar, otherNode.subjectVar)
+ .append(geoFilters, otherNode.geoFilters)
+ .append(geoPattern, otherNode.geoPattern)
+ .append(temporalFilters, otherNode.temporalFilters)
+ .append(temporalPattern, otherNode.temporalPattern)
+ .append(bindingNames, otherNode.bindingNames)
+ .append(subjectConstant, otherNode.subjectConstant)
+ .isEquals();
}
return false;
}
@@ -280,4 +296,77 @@ public class EventQueryNode extends ExternalSet implements ExternalBatchingItera
throws QueryEvaluationException {
return null;
}
+
+ /**
+ * Builder for {@link EventQueryNode}s.
+ */
+ public static class EventQueryNodeBuilder {
+ private EventStorage store;
+ private StatementPattern geoPattern;
+ private StatementPattern temporalPattern;
+ private Collection<IndexingExpr> geoFilters;
+ private Collection<IndexingExpr> temporalFilters;
+ private Collection<FunctionCall> usedFilters;
+
+ /**
+ * @param store - The {@link EventStorage} to use in the {@link EntityQueryNode}
+ * @return - The Builder.
+ */
+ public EventQueryNodeBuilder setStorage(final EventStorage store) {
+ this.store = store;
+ return this;
+ }
+
+ /**
+ * @param geoPattern - The geo {@link StatementPattern} to use in the {@link EntityQueryNode}
+ * @return - The Builder.
+ */
+ public EventQueryNodeBuilder setGeoPattern(final StatementPattern geoPattern) {
+ this.geoPattern = geoPattern;
+ return this;
+ }
+
+ /**
+ * @param temporalPattern - The temporal {@link StatementPattern} to use in the {@link EntityQueryNode}
+ * @return - The Builder.
+ */
+ public EventQueryNodeBuilder setTemporalPattern(final StatementPattern temporalPattern) {
+ this.temporalPattern = temporalPattern;
+ return this;
+ }
+
+ /**
+ * @param geoFilters - The geo filter(s) {@link IndexingExpr} to use in the {@link EntityQueryNode}
+ * @return - The Builder.
+ */
+ public EventQueryNodeBuilder setGeoFilters(final Collection<IndexingExpr> geoFilters) {
+ this.geoFilters = geoFilters;
+ return this;
+ }
+
+ /**
+ * @param temporalFilters - The temporal filter(s) {@link IndexingExpr} to use in the {@link EntityQueryNode}
+ * @return - The Builder.
+ */
+ public EventQueryNodeBuilder setTemporalFilters(final Collection<IndexingExpr> temporalFilters) {
+ this.temporalFilters = temporalFilters;
+ return this;
+ }
+
+ /**
+ * @param usedFilters - The filter(s) used by the {@link EntityQueryNode}
+ * @return - The Builder.
+ */
+ public EventQueryNodeBuilder setUsedFilters(final Collection<FunctionCall> usedFilters) {
+ this.usedFilters = usedFilters;
+ return this;
+ }
+
+ /**
+ * @return The {@link EntityQueryNode} built by the builder.
+ */
+ public EventQueryNode build() {
+ return new EventQueryNode(store, geoPattern, temporalPattern, geoFilters, temporalFilters, usedFilters);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/646d21b4/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/mongo/EventUpdater.java
----------------------------------------------------------------------
diff --git a/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/mongo/EventUpdater.java b/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/mongo/EventUpdater.java
index c9f4658..1c62407 100644
--- a/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/mongo/EventUpdater.java
+++ b/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/mongo/EventUpdater.java
@@ -26,7 +26,7 @@ import org.apache.rya.api.domain.RyaURI;
import org.apache.rya.indexing.geotemporal.model.Event;
import org.apache.rya.indexing.geotemporal.storage.EventStorage;
import org.apache.rya.indexing.geotemporal.storage.EventStorage.EventStorageException;
-import org.apache.rya.indexing.mongodb.update.DocumentUpdater;
+import org.apache.rya.indexing.mongodb.update.MongoDocumentUpdater;
import org.apache.rya.indexing.mongodb.update.RyaObjectStorage.ObjectStorageException;
import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
@@ -36,7 +36,7 @@ import edu.umd.cs.findbugs.annotations.NonNull;
* Performs update operations over an {@link EventStorage}.
*/
@DefaultAnnotation(NonNull.class)
-public class EventUpdater implements DocumentUpdater<RyaURI, Event>{
+public class EventUpdater implements MongoDocumentUpdater<RyaURI, Event>{
private final EventStorage events;
/**
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/646d21b4/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/mongo/GeoTemporalMongoDBStorageStrategy.java
----------------------------------------------------------------------
diff --git a/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/mongo/GeoTemporalMongoDBStorageStrategy.java b/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/mongo/GeoTemporalMongoDBStorageStrategy.java
index 352dcb6..ab44ffe 100644
--- a/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/mongo/GeoTemporalMongoDBStorageStrategy.java
+++ b/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/mongo/GeoTemporalMongoDBStorageStrategy.java
@@ -63,7 +63,8 @@ import com.vividsolutions.jts.io.WKTReader;
import jline.internal.Log;
/**
- * TODO: doc
+ * Storage adapter for serializing Geo Temporal statements into mongo objects.
+ * This includes adapting the {@link IndexingExpr}s for the GeoTemporal indexer.
*/
public class GeoTemporalMongoDBStorageStrategy extends IndexingMongoDBStorageStrategy {
private static final Logger LOG = Logger.getLogger(GeoTemporalMongoDBStorageStrategy.class);
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/646d21b4/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/mongo/MongoEventStorage.java
----------------------------------------------------------------------
diff --git a/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/mongo/MongoEventStorage.java b/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/mongo/MongoEventStorage.java
index 8ddf075..9c13c8b 100644
--- a/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/mongo/MongoEventStorage.java
+++ b/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/mongo/MongoEventStorage.java
@@ -31,10 +31,10 @@ import org.apache.rya.api.domain.RyaURI;
import org.apache.rya.indexing.IndexingExpr;
import org.apache.rya.indexing.entity.model.TypedEntity;
import org.apache.rya.indexing.entity.storage.mongo.DocumentConverter.DocumentConverterException;
+import org.apache.rya.indexing.entity.storage.mongo.MongoEntityStorage;
import org.apache.rya.indexing.geotemporal.GeoTemporalIndexException;
import org.apache.rya.indexing.geotemporal.model.Event;
import org.apache.rya.indexing.geotemporal.storage.EventStorage;
-import org.apache.rya.indexing.entity.storage.mongo.MongoEntityStorage;
import org.bson.BsonDocument;
import org.bson.BsonString;
import org.bson.Document;
@@ -138,9 +138,8 @@ public class MongoEventStorage implements EventStorage {
.iterator();
final List<Event> events = new ArrayList<>();
- final EventDocumentConverter adapter = new EventDocumentConverter();
while(results.hasNext()) {
- events.add(adapter.fromDocument(results.next()));
+ events.add(EVENT_CONVERTER.fromDocument(results.next()));
}
return events;
} catch(final MongoException | DocumentConverterException | GeoTemporalIndexException e) {
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/646d21b4/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/mongo/MongoGeoTemporalIndexer.java
----------------------------------------------------------------------
diff --git a/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/mongo/MongoGeoTemporalIndexer.java b/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/mongo/MongoGeoTemporalIndexer.java
index 1baab18..34df399 100644
--- a/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/mongo/MongoGeoTemporalIndexer.java
+++ b/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/mongo/MongoGeoTemporalIndexer.java
@@ -206,12 +206,16 @@ public class MongoGeoTemporalIndexer extends AbstractMongoIndexer<GeoTemporalMon
@Override
public EventStorage getEventStorage(final Configuration conf) {
+ requireNonNull(conf);
+
if(events.get() != null) {
return events.get();
}
- final MongoDBRdfConfiguration mongoConf = (MongoDBRdfConfiguration) conf;
+
+ final MongoDBRdfConfiguration mongoConf = new MongoDBRdfConfiguration(conf);
mongoClient = mongoConf.getMongoClient();
+ configuration.set(mongoConf);
if (mongoClient == null) {
mongoClient = MongoConnectorFactory.getMongoClient(conf);
}
[4/5] incubator-rya git commit: RYA-239 GeoTemporal tests added
Posted by mi...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/646d21b4/extras/rya.geoindexing/src/test/java/org/apache/rya/indexing/geotemporal/GeoTemporalProviderTest.java
----------------------------------------------------------------------
diff --git a/extras/rya.geoindexing/src/test/java/org/apache/rya/indexing/geotemporal/GeoTemporalProviderTest.java b/extras/rya.geoindexing/src/test/java/org/apache/rya/indexing/geotemporal/GeoTemporalProviderTest.java
new file mode 100644
index 0000000..7151b56
--- /dev/null
+++ b/extras/rya.geoindexing/src/test/java/org/apache/rya/indexing/geotemporal/GeoTemporalProviderTest.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.geotemporal;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+
+import java.util.List;
+
+import org.apache.rya.indexing.GeoConstants;
+import org.apache.rya.indexing.TemporalInstantRfc3339;
+import org.apache.rya.indexing.external.matching.QuerySegment;
+import org.apache.rya.indexing.geotemporal.GeoTemporalIndexSetProvider;
+import org.apache.rya.indexing.geotemporal.model.EventQueryNode;
+import org.apache.rya.indexing.geotemporal.storage.EventStorage;
+import org.junit.Before;
+import org.junit.Test;
+import org.openrdf.model.URI;
+import org.openrdf.model.Value;
+import org.openrdf.model.ValueFactory;
+import org.openrdf.model.impl.ValueFactoryImpl;
+
+public class GeoTemporalProviderTest extends GeoTemporalTestBase {
+ private static final String URI_PROPERTY_AT_TIME = "Property:atTime";
+ private GeoTemporalIndexSetProvider provider;
+ private EventStorage events;
+ @Before
+ public void setup() {
+ events = mock(EventStorage.class);
+ provider = new GeoTemporalIndexSetProvider(events);
+ }
+
+ /*
+ * Simplest Happy Path test
+ */
+ @Test
+ public void twoPatternsTwoFilters_test() throws Exception {
+ final ValueFactory vf = new ValueFactoryImpl();
+ final Value geo = vf.createLiteral("Point(0 0)", GeoConstants.XMLSCHEMA_OGC_WKT);
+ final Value temp = vf.createLiteral(new TemporalInstantRfc3339(2015, 12, 30, 12, 00, 0).toString());
+ final URI tempPred = vf.createURI(URI_PROPERTY_AT_TIME);
+ final String query =
+ "PREFIX geo: <http://www.opengis.net/ont/geosparql#>" +
+ "PREFIX geos: <http://www.opengis.net/def/function/geosparql/>" +
+ "PREFIX time: <tag:rya-rdf.org,2015:temporal#>" +
+ "SELECT * WHERE { " +
+ "?subj <" + tempPred + "> ?time ."+
+ "?subj <" + GeoConstants.GEO_AS_WKT + "> ?loc . " +
+ " FILTER(geos:sfContains(?loc, " + geo + ")) . " +
+ " FILTER(time:equals(?time, " + temp + ")) . " +
+ "}";
+ final QuerySegment<EventQueryNode> node = getQueryNode(query);
+ final List<EventQueryNode> nodes = provider.getExternalSets(node);
+ assertEquals(1, nodes.size());
+ }
+
+ @Test
+ public void onePatternTwoFilters_test() throws Exception {
+ final ValueFactory vf = new ValueFactoryImpl();
+ final Value geo = vf.createLiteral("Point(0 0)", GeoConstants.XMLSCHEMA_OGC_WKT);
+ final Value temp = vf.createLiteral(new TemporalInstantRfc3339(2015, 12, 30, 12, 00, 0).toString());
+ final URI tempPred = vf.createURI(URI_PROPERTY_AT_TIME);
+ final String query =
+ "PREFIX geo: <http://www.opengis.net/ont/geosparql#>" +
+ "PREFIX geos: <http://www.opengis.net/def/function/geosparql/>" +
+ "PREFIX time: <tag:rya-rdf.org,2015:temporal#>" +
+ "SELECT * WHERE { " +
+ "?subj <" + tempPred + "> ?time ."+
+ " FILTER(geos:sfContains(?loc, " + geo + ")) . " +
+ " FILTER(time:equals(?time, " + temp + ")) . " +
+ "}";
+ final QuerySegment<EventQueryNode> node = getQueryNode(query);
+ final List<EventQueryNode> nodes = provider.getExternalSets(node);
+ assertEquals(0, nodes.size());
+ }
+
+ @Test
+ public void twoPatternsOneFilter_test() throws Exception {
+ final ValueFactory vf = new ValueFactoryImpl();
+ final Value geo = vf.createLiteral("Point(0 0)", GeoConstants.XMLSCHEMA_OGC_WKT);
+ final Value temp = vf.createLiteral(new TemporalInstantRfc3339(2015, 12, 30, 12, 00, 0).toString());
+ final URI tempPred = vf.createURI(URI_PROPERTY_AT_TIME);
+ final String query =
+ "PREFIX geo: <http://www.opengis.net/ont/geosparql#>" +
+ "PREFIX geos: <http://www.opengis.net/def/function/geosparql/>" +
+ "PREFIX time: <tag:rya-rdf.org,2015:temporal#>" +
+ "SELECT * WHERE { " +
+ "?subj <" + tempPred + "> ?time ."+
+ "?subj <" + GeoConstants.GEO_AS_WKT + "> ?loc . " +
+ " FILTER(geos:sfContains(?loc, " + geo + ")) . " +
+ "}";
+ final QuerySegment<EventQueryNode> node = getQueryNode(query);
+ final List<EventQueryNode> nodes = provider.getExternalSets(node);
+ assertEquals(0, nodes.size());
+ }
+
+ @Test
+ public void twoPatternsNoFilter_test() throws Exception {
+ final ValueFactory vf = new ValueFactoryImpl();
+ final URI tempPred = vf.createURI(URI_PROPERTY_AT_TIME);
+ final String query =
+ "PREFIX geo: <http://www.opengis.net/ont/geosparql#>" +
+ "PREFIX geos: <http://www.opengis.net/def/function/geosparql/>" +
+ "PREFIX time: <tag:rya-rdf.org,2015:temporal#>" +
+ "SELECT * WHERE { " +
+ "?subj <" + tempPred + "> ?time ."+
+ "?subj <" + GeoConstants.GEO_AS_WKT + "> ?loc . " +
+ "}";
+ final QuerySegment<EventQueryNode> node = getQueryNode(query);
+ final List<EventQueryNode> nodes = provider.getExternalSets(node);
+ assertEquals(0, nodes.size());
+ }
+
+ @Test
+ public void twoPatternsTwoFiltersNotValid_test() throws Exception {
+ final ValueFactory vf = new ValueFactoryImpl();
+ final Value geo = vf.createLiteral("Point(0 0)", GeoConstants.XMLSCHEMA_OGC_WKT);
+ final Value temp = vf.createLiteral(new TemporalInstantRfc3339(2015, 12, 30, 12, 00, 0).toString());
+ final URI tempPred = vf.createURI(URI_PROPERTY_AT_TIME);
+ //Only handles geo and temporal filters
+ final String query =
+ "PREFIX geo: <http://www.opengis.net/ont/geosparql#>" +
+ "PREFIX geos: <http://www.opengis.net/def/function/geosparql/>" +
+ "PREFIX text: <http://rdf.useekm.com/fts#text>" +
+ "SELECT * WHERE { " +
+ "?subj <" + tempPred + "> ?time ."+
+ "?subj <" + GeoConstants.GEO_AS_WKT + "> ?loc . " +
+ " FILTER(geos:sfContains(?loc, " + geo + ")) . " +
+ " FILTER(text:equals(?time, " + temp + ")) . " +
+ "}";
+ final QuerySegment<EventQueryNode> node = getQueryNode(query);
+ final List<EventQueryNode> nodes = provider.getExternalSets(node);
+ assertEquals(0, nodes.size());
+ }
+
+ @Test
+ public void twoSubjOneFilter_test() throws Exception {
+ final ValueFactory vf = new ValueFactoryImpl();
+ final Value geo = vf.createLiteral("Point(0 0)", GeoConstants.XMLSCHEMA_OGC_WKT);
+ final Value temp = vf.createLiteral(new TemporalInstantRfc3339(2015, 12, 30, 12, 00, 0).toString());
+ final URI tempPred = vf.createURI(URI_PROPERTY_AT_TIME);
+ final String query =
+ "PREFIX geo: <http://www.opengis.net/ont/geosparql#>" +
+ "PREFIX geos: <http://www.opengis.net/def/function/geosparql/>" +
+ "PREFIX time: <tag:rya-rdf.org,2015:temporal#>" +
+ "SELECT * WHERE { " +
+ "?subj <" + tempPred + "> ?time ."+
+ "?subj <" + GeoConstants.GEO_AS_WKT + "> ?loc . " +
+ "?subj2 <" + tempPred + "> ?time2 ."+
+ "?subj2 <" + GeoConstants.GEO_AS_WKT + "> ?loc2 . " +
+ " FILTER(geos:sfContains(?loc, " + geo + ")) . " +
+ " FILTER(time:equals(?time, " + temp + ")) . " +
+ "}";
+ final QuerySegment<EventQueryNode> node = getQueryNode(query);
+ final List<EventQueryNode> nodes = provider.getExternalSets(node);
+ assertEquals(1, nodes.size());
+ }
+
+ @Test
+ public void twoNode_test() throws Exception {
+ final ValueFactory vf = new ValueFactoryImpl();
+ final Value geo = vf.createLiteral("Point(0 0)", GeoConstants.XMLSCHEMA_OGC_WKT);
+ final Value temp = vf.createLiteral(new TemporalInstantRfc3339(2015, 12, 30, 12, 00, 0).toString());
+ final URI tempPred = vf.createURI(URI_PROPERTY_AT_TIME);
+ final String query =
+ "PREFIX geo: <http://www.opengis.net/ont/geosparql#>" +
+ "PREFIX geos: <http://www.opengis.net/def/function/geosparql/>" +
+ "PREFIX time: <tag:rya-rdf.org,2015:temporal#>" +
+ "SELECT * WHERE { " +
+ "?subj <" + tempPred + "> ?time ."+
+ "?subj <" + GeoConstants.GEO_AS_WKT + "> ?loc . " +
+ "?subj2 <" + tempPred + "> ?time2 ."+
+ "?subj2 <" + GeoConstants.GEO_AS_WKT + "> ?loc2 . " +
+ " FILTER(geos:sfContains(?loc, " + geo + ")) . " +
+ " FILTER(time:equals(?time, " + temp + ")) . " +
+ " FILTER(geos:sfContains(?loc2, " + geo + ")) . " +
+ " FILTER(time:equals(?time2, " + temp + ")) . " +
+ "}";
+ final QuerySegment<EventQueryNode> node = getQueryNode(query);
+ final List<EventQueryNode> nodes = provider.getExternalSets(node);
+ assertEquals(2, nodes.size());
+ }
+
+ @Test
+ public void twoSubjectMultiFilter_test() throws Exception {
+ final ValueFactory vf = new ValueFactoryImpl();
+ final Value geo = vf.createLiteral("Point(0 0)", GeoConstants.XMLSCHEMA_OGC_WKT);
+ final Value temp = vf.createLiteral(new TemporalInstantRfc3339(2015, 12, 30, 12, 00, 0).toString());
+ final URI tempPred = vf.createURI(URI_PROPERTY_AT_TIME);
+ final String query =
+ "PREFIX geo: <http://www.opengis.net/ont/geosparql#>" +
+ "PREFIX geos: <http://www.opengis.net/def/function/geosparql/>" +
+ "PREFIX time: <tag:rya-rdf.org,2015:temporal#>" +
+ "SELECT * WHERE { " +
+ "?subj <" + tempPred + "> ?time ."+
+ "?subj <" + GeoConstants.GEO_AS_WKT + "> ?loc . " +
+ " FILTER(geos:sfContains(?loc, " + geo + ")) . " +
+ " FILTER(time:equals(?time, " + temp + ")) . " +
+ " FILTER(geos:sfWithin(?loc, " + geo + ")) . " +
+ " FILTER(time:before(?time, " + temp + ")) . " +
+ "}";
+ final QuerySegment<EventQueryNode> node = getQueryNode(query);
+ final List<EventQueryNode> nodes = provider.getExternalSets(node);
+ assertEquals(1, nodes.size());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/646d21b4/extras/rya.geoindexing/src/test/java/org/apache/rya/indexing/geotemporal/GeoTemporalTestBase.java
----------------------------------------------------------------------
diff --git a/extras/rya.geoindexing/src/test/java/org/apache/rya/indexing/geotemporal/GeoTemporalTestBase.java b/extras/rya.geoindexing/src/test/java/org/apache/rya/indexing/geotemporal/GeoTemporalTestBase.java
new file mode 100644
index 0000000..6b6bf15
--- /dev/null
+++ b/extras/rya.geoindexing/src/test/java/org/apache/rya/indexing/geotemporal/GeoTemporalTestBase.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.geotemporal;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.rya.indexing.TemporalInstant;
+import org.apache.rya.indexing.TemporalInstantRfc3339;
+import org.apache.rya.indexing.external.matching.QuerySegment;
+import org.apache.rya.indexing.geotemporal.model.EventQueryNode;
+import org.junit.ComparisonFailure;
+import org.mockito.Mockito;
+import org.openrdf.query.algebra.FunctionCall;
+import org.openrdf.query.algebra.QueryModelNode;
+import org.openrdf.query.algebra.StatementPattern;
+import org.openrdf.query.algebra.helpers.QueryModelVisitorBase;
+import org.openrdf.query.algebra.helpers.StatementPatternCollector;
+import org.openrdf.query.parser.sparql.SPARQLParser;
+
+import com.vividsolutions.jts.geom.Coordinate;
+import com.vividsolutions.jts.geom.GeometryFactory;
+import com.vividsolutions.jts.geom.LineString;
+import com.vividsolutions.jts.geom.LinearRing;
+import com.vividsolutions.jts.geom.Point;
+import com.vividsolutions.jts.geom.Polygon;
+import com.vividsolutions.jts.geom.PrecisionModel;
+import com.vividsolutions.jts.geom.impl.PackedCoordinateSequence;
+
+public class GeoTemporalTestBase {
+ private static final GeometryFactory gf = new GeometryFactory(new PrecisionModel(), 4326);
+
+ /**
+ * Make an uniform instant with given seconds.
+ */
+ protected static TemporalInstant makeInstant(final int secondsMakeMeUnique) {
+ return new TemporalInstantRfc3339(2015, 12, 30, 12, 00, secondsMakeMeUnique);
+ }
+
+ protected static Polygon poly(final double[] arr) {
+ final LinearRing r1 = gf.createLinearRing(new PackedCoordinateSequence.Double(arr, 2));
+ final Polygon p1 = gf.createPolygon(r1, new LinearRing[] {});
+ return p1;
+ }
+
+ protected static Point point(final double x, final double y) {
+ return gf.createPoint(new Coordinate(x, y));
+ }
+
+ protected static LineString line(final double x1, final double y1, final double x2, final double y2) {
+ return new LineString(new PackedCoordinateSequence.Double(new double[] { x1, y1, x2, y2 }, 2), gf);
+ }
+
+ protected static double[] bbox(final double x1, final double y1, final double x2, final double y2) {
+ return new double[] { x1, y1, x1, y2, x2, y2, x2, y1, x1, y1 };
+ }
+
+ protected void assertEqualMongo(final Object expected, final Object actual) throws ComparisonFailure {
+ try {
+ assertEquals(expected, actual);
+ } catch(final Throwable e) {
+ throw new ComparisonFailure(e.getMessage(), expected.toString(), actual.toString());
+ }
+ }
+
+ public List<FunctionCall> getFilters(final String query) throws Exception {
+ final FunctionCallCollector collector = new FunctionCallCollector();
+ new SPARQLParser().parseQuery(query, null).getTupleExpr().visit(collector);
+ return collector.getTupleExpr();
+ }
+
+ public List<StatementPattern> getSps(final String query) throws Exception {
+ final StatementPatternCollector collector = new StatementPatternCollector();
+ new SPARQLParser().parseQuery(query, null).getTupleExpr().visit(collector);
+ return collector.getStatementPatterns();
+ }
+
+ public QuerySegment<EventQueryNode> getQueryNode(final String query) throws Exception {
+ final List<QueryModelNode> exprs = getNodes(query);
+ final QuerySegment<EventQueryNode> node = Mockito.mock(QuerySegment.class);
+ //provider only cares about Ordered nodes.
+ Mockito.when(node.getOrderedNodes()).thenReturn(exprs);
+ return node;
+ }
+
+ private static List<QueryModelNode> getNodes(final String sparql) throws Exception {
+ final NodeCollector collector = new NodeCollector();
+ new SPARQLParser().parseQuery(sparql, null).getTupleExpr().visit(collector);
+ return collector.getTupleExpr();
+ }
+
+ private static class NodeCollector extends QueryModelVisitorBase<RuntimeException> {
+ private final List<QueryModelNode> stPatterns = new ArrayList<>();
+
+ public List<QueryModelNode> getTupleExpr() {
+ return stPatterns;
+ }
+
+ @Override
+ public void meet(final FunctionCall node) {
+ stPatterns.add(node);
+ }
+
+ @Override
+ public void meet(final StatementPattern node) {
+ stPatterns.add(node);
+ }
+ }
+
+ private static class FunctionCallCollector extends QueryModelVisitorBase<RuntimeException> {
+ private final List<FunctionCall> filters = new ArrayList<>();
+
+ public List<FunctionCall> getTupleExpr() {
+ return filters;
+ }
+
+ @Override
+ public void meet(final FunctionCall node) {
+ filters.add(node);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/646d21b4/extras/rya.geoindexing/src/test/java/org/apache/rya/indexing/geotemporal/MongoGeoTemporalIndexIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.geoindexing/src/test/java/org/apache/rya/indexing/geotemporal/MongoGeoTemporalIndexIT.java b/extras/rya.geoindexing/src/test/java/org/apache/rya/indexing/geotemporal/MongoGeoTemporalIndexIT.java
new file mode 100644
index 0000000..66de3fa
--- /dev/null
+++ b/extras/rya.geoindexing/src/test/java/org/apache/rya/indexing/geotemporal/MongoGeoTemporalIndexIT.java
@@ -0,0 +1,174 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.geotemporal;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+
+import org.apache.rya.api.RdfCloudTripleStoreConfiguration;
+import org.apache.rya.api.domain.RyaURI;
+import org.apache.rya.indexing.GeoConstants;
+import org.apache.rya.indexing.GeoRyaSailFactory;
+import org.apache.rya.indexing.OptionalConfigUtils;
+import org.apache.rya.indexing.TemporalInstantRfc3339;
+import org.apache.rya.indexing.accumulo.ConfigUtils;
+import org.apache.rya.indexing.geotemporal.model.Event;
+import org.apache.rya.indexing.geotemporal.mongo.MongoGeoTemporalIndexer;
+import org.apache.rya.indexing.geotemporal.storage.EventStorage;
+import org.apache.rya.mongodb.MockMongoFactory;
+import org.apache.rya.mongodb.MongoDBRdfConfiguration;
+import org.junit.Before;
+import org.junit.Test;
+import org.openrdf.model.URI;
+import org.openrdf.model.Value;
+import org.openrdf.model.ValueFactory;
+import org.openrdf.model.impl.ValueFactoryImpl;
+import org.openrdf.query.BindingSet;
+import org.openrdf.query.QueryLanguage;
+import org.openrdf.query.TupleQueryResult;
+import org.openrdf.query.impl.MapBindingSet;
+import org.openrdf.repository.sail.SailRepository;
+import org.openrdf.repository.sail.SailRepositoryConnection;
+import org.openrdf.sail.Sail;
+
+import com.mongodb.MongoClient;
+
+public class MongoGeoTemporalIndexIT {
+ private static final String URI_PROPERTY_AT_TIME = "Property:atTime";
+
+ private static final ValueFactory VF = ValueFactoryImpl.getInstance();
+ private MongoDBRdfConfiguration conf;
+ private SailRepositoryConnection conn;
+ private MongoClient mongoClient;
+
+ @Before
+ public void setUp() throws Exception{
+ mongoClient = MockMongoFactory.newFactory().newMongoClient();
+ conf = new MongoDBRdfConfiguration();
+ conf.set(MongoDBRdfConfiguration.MONGO_DB_NAME, "test");
+ conf.set(MongoDBRdfConfiguration.MONGO_COLLECTION_PREFIX, "rya");
+ conf.set(RdfCloudTripleStoreConfiguration.CONF_TBL_PREFIX, "rya");
+ conf.setBoolean(ConfigUtils.USE_MONGO, true);
+ conf.setBoolean(OptionalConfigUtils.USE_GEOTEMPORAL, true);
+ conf.setMongoClient(mongoClient);
+
+ final Sail sail = GeoRyaSailFactory.getInstance(conf);
+ conn = new SailRepository(sail).getConnection();
+ conn.begin();
+
+ addStatements();
+ }
+
+ @Test
+ public void ensureInEventStore_Test() throws Exception {
+ final MongoGeoTemporalIndexer indexer = new MongoGeoTemporalIndexer();
+ indexer.initIndexer(conf, mongoClient);
+
+ final EventStorage events = indexer.getEventStorage(conf);
+ final RyaURI subject = new RyaURI("urn:event1");
+ final Optional<Event> event = events.get(subject);
+ assertTrue(event.isPresent());
+ }
+
+ @Test
+ public void constantSubjQuery_Test() throws Exception {
+ final String query =
+ "PREFIX time: <http://www.w3.org/2006/time#> \n"
+ + "PREFIX tempo: <tag:rya-rdf.org,2015:temporal#> \n"
+ + "PREFIX geo: <http://www.opengis.net/ont/geosparql#>"
+ + "PREFIX geof: <http://www.opengis.net/def/function/geosparql/>"
+ + "SELECT * "
+ + "WHERE { "
+ + " <urn:event1> time:atTime ?time . "
+ + " <urn:event1> geo:asWKT ?point . "
+ + " FILTER(geof:sfWithin(?point, \"POLYGON((-3 -2, -3 2, 1 2, 1 -2, -3 -2))\"^^geo:wktLiteral)) "
+ + " FILTER(tempo:equals(?time, \"2015-12-30T12:00:00Z\")) "
+ + "}";
+
+ final TupleQueryResult rez = conn.prepareTupleQuery(QueryLanguage.SPARQL, query).evaluate();
+ final Set<BindingSet> results = new HashSet<>();
+ while(rez.hasNext()) {
+ final BindingSet bs = rez.next();
+ results.add(bs);
+ }
+ final MapBindingSet expected = new MapBindingSet();
+ expected.addBinding("point", VF.createLiteral("POINT (0 0)"));
+ expected.addBinding("time", VF.createLiteral("2015-12-30T12:00:00Z"));
+
+ assertEquals(1, results.size());
+ assertEquals(expected, results.iterator().next());
+ }
+
+ @Test
+ public void variableSubjQuery_Test() throws Exception {
+ final String query =
+ "PREFIX time: <http://www.w3.org/2006/time#> \n"
+ + "PREFIX tempo: <tag:rya-rdf.org,2015:temporal#> \n"
+ + "PREFIX geo: <http://www.opengis.net/ont/geosparql#>"
+ + "PREFIX geof: <http://www.opengis.net/def/function/geosparql/>"
+ + "SELECT * "
+ + "WHERE { "
+ + " ?subj time:atTime ?time . "
+ + " ?subj geo:asWKT ?point . "
+ + " FILTER(geof:sfWithin(?point, \"POLYGON((-3 -2, -3 2, 1 2, 1 -2, -3 -2))\"^^geo:wktLiteral)) "
+ + " FILTER(tempo:equals(?time, \"2015-12-30T12:00:00Z\")) "
+ + "}";
+
+ final TupleQueryResult rez = conn.prepareTupleQuery(QueryLanguage.SPARQL, query).evaluate();
+ final List<BindingSet> results = new ArrayList<>();
+ while(rez.hasNext()) {
+ final BindingSet bs = rez.next();
+ results.add(bs);
+ }
+ final MapBindingSet expected1 = new MapBindingSet();
+ expected1.addBinding("point", VF.createLiteral("POINT (0 0)"));
+ expected1.addBinding("time", VF.createLiteral("2015-12-30T12:00:00Z"));
+
+ final MapBindingSet expected2 = new MapBindingSet();
+ expected2.addBinding("point", VF.createLiteral("POINT (1 1)"));
+ expected2.addBinding("time", VF.createLiteral("2015-12-30T12:00:00Z"));
+
+ assertEquals(2, results.size());
+ assertEquals(expected1, results.get(0));
+ assertEquals(expected2, results.get(1));
+ }
+
+ private void addStatements() throws Exception {
+ URI subject = VF.createURI("urn:event1");
+ final URI predicate = VF.createURI(URI_PROPERTY_AT_TIME);
+ Value object = VF.createLiteral(new TemporalInstantRfc3339(2015, 12, 30, 12, 00, 0).toString());
+ conn.add(VF.createStatement(subject, predicate, object));
+
+ object = VF.createLiteral("Point(0 0)", GeoConstants.XMLSCHEMA_OGC_WKT);
+ conn.add(VF.createStatement(subject, GeoConstants.GEO_AS_WKT, object));
+
+ subject = VF.createURI("urn:event2");
+ object = VF.createLiteral(new TemporalInstantRfc3339(2015, 12, 30, 12, 00, 0).toString());
+ conn.add(VF.createStatement(subject, predicate, object));
+
+ object = VF.createLiteral("Point(1 1)", GeoConstants.XMLSCHEMA_OGC_WKT);
+ conn.add(VF.createStatement(subject, GeoConstants.GEO_AS_WKT, object));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/646d21b4/extras/rya.geoindexing/src/test/java/org/apache/rya/indexing/geotemporal/model/EventQueryNodeTest.java
----------------------------------------------------------------------
diff --git a/extras/rya.geoindexing/src/test/java/org/apache/rya/indexing/geotemporal/model/EventQueryNodeTest.java b/extras/rya.geoindexing/src/test/java/org/apache/rya/indexing/geotemporal/model/EventQueryNodeTest.java
new file mode 100644
index 0000000..d9e0294
--- /dev/null
+++ b/extras/rya.geoindexing/src/test/java/org/apache/rya/indexing/geotemporal/model/EventQueryNodeTest.java
@@ -0,0 +1,368 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.geotemporal.model;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.rya.api.domain.RyaURI;
+import org.apache.rya.indexing.IndexingExpr;
+import org.apache.rya.indexing.IndexingFunctionRegistry;
+import org.apache.rya.indexing.IndexingFunctionRegistry.FUNCTION_TYPE;
+import org.apache.rya.indexing.TemporalInstant;
+import org.apache.rya.indexing.TemporalInstantRfc3339;
+import org.apache.rya.indexing.geotemporal.GeoTemporalTestBase;
+import org.apache.rya.indexing.geotemporal.mongo.MongoEventStorage;
+import org.apache.rya.indexing.geotemporal.storage.EventStorage;
+import org.apache.rya.mongodb.MockMongoFactory;
+import org.junit.Test;
+import org.openrdf.model.URI;
+import org.openrdf.model.Value;
+import org.openrdf.model.ValueFactory;
+import org.openrdf.model.impl.URIImpl;
+import org.openrdf.model.impl.ValueFactoryImpl;
+import org.openrdf.query.BindingSet;
+import org.openrdf.query.QueryEvaluationException;
+import org.openrdf.query.algebra.FunctionCall;
+import org.openrdf.query.algebra.StatementPattern;
+import org.openrdf.query.algebra.ValueConstant;
+import org.openrdf.query.algebra.ValueExpr;
+import org.openrdf.query.algebra.Var;
+import org.openrdf.query.impl.MapBindingSet;
+
+import com.mongodb.MongoClient;
+import com.vividsolutions.jts.geom.Coordinate;
+import com.vividsolutions.jts.geom.Geometry;
+import com.vividsolutions.jts.geom.GeometryFactory;
+import com.vividsolutions.jts.geom.PrecisionModel;
+
+import info.aduna.iteration.CloseableIteration;
+
+/**
+ * Unit tests the methods of {@link EventQueryNode}.
+ */
+public class EventQueryNodeTest extends GeoTemporalTestBase {
+ private static final GeometryFactory GF = new GeometryFactory(new PrecisionModel(), 4326);
+ private static final ValueFactory VF = ValueFactoryImpl.getInstance();
+
+ @Test(expected = IllegalStateException.class)
+ public void constructor_differentSubjects() throws Exception {
+ final Var geoSubj = new Var("point");
+ final Var geoPred = new Var("-const-http://www.opengis.net/ont/geosparql#asWKT", ValueFactoryImpl.getInstance().createURI("http://www.opengis.net/ont/geosparql#asWKT"));
+ final Var geoObj = new Var("wkt");
+ final StatementPattern geoSP = new StatementPattern(geoSubj, geoPred, geoObj);
+
+ final Var timeSubj = new Var("time");
+ final Var timePred = new Var("-const-http://www.w3.org/2006/time#inXSDDateTime", ValueFactoryImpl.getInstance().createURI("-const-http://www.w3.org/2006/time#inXSDDateTime"));
+ final Var timeObj = new Var("time");
+ final StatementPattern timeSP = new StatementPattern(timeSubj, timePred, timeObj);
+ // This will fail.
+ new EventQueryNode.EventQueryNodeBuilder()
+ .setStorage(mock(EventStorage.class))
+ .setGeoPattern(geoSP)
+ .setTemporalPattern(timeSP)
+ .setGeoFilters(new ArrayList<IndexingExpr>())
+ .setTemporalFilters(new ArrayList<IndexingExpr>())
+ .setUsedFilters(new ArrayList<>())
+ .build();
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void constructor_variablePredicate() throws Exception {
+ // A pattern that has a variable for its predicate.
+ final Var geoSubj = new Var("point");
+ final Var geoPred = new Var("geo");
+ final Var geoObj = new Var("wkt");
+ final StatementPattern geoSP = new StatementPattern(geoSubj, geoPred, geoObj);
+
+ final Var timeSubj = new Var("time");
+ final Var timePred = new Var("-const-http://www.w3.org/2006/time#inXSDDateTime", ValueFactoryImpl.getInstance().createURI("-const-http://www.w3.org/2006/time#inXSDDateTime"));
+ final Var timeObj = new Var("time");
+ final StatementPattern timeSP = new StatementPattern(timeSubj, timePred, timeObj);
+ // This will fail.
+ new EventQueryNode.EventQueryNodeBuilder()
+ .setStorage(mock(EventStorage.class))
+ .setGeoPattern(geoSP)
+ .setTemporalPattern(timeSP)
+ .setGeoFilters(new ArrayList<IndexingExpr>())
+ .setTemporalFilters(new ArrayList<IndexingExpr>())
+ .setUsedFilters(new ArrayList<>())
+ .build();
+ }
+
+ @Test
+ public void evaluate_constantSubject() throws Exception {
+ final MongoClient client = MockMongoFactory.newFactory().newMongoClient();
+ final EventStorage storage = new MongoEventStorage(client, "testDB");
+ RyaURI subject = new RyaURI("urn:event-1111");
+ final Geometry geo = GF.createPoint(new Coordinate(1, 1));
+ final TemporalInstant temp = new TemporalInstantRfc3339(2015, 12, 30, 12, 00, 0);
+ final Event event = Event.builder()
+ .setSubject(subject)
+ .setGeometry(geo)
+ .setTemporalInstant(temp)
+ .build();
+
+ subject = new RyaURI("urn:event-2222");
+ final Event otherEvent = Event.builder()
+ .setSubject(subject)
+ .setGeometry(geo)
+ .setTemporalInstant(temp)
+ .build();
+
+ storage.create(event);
+ storage.create(otherEvent);
+
+ final String query =
+ "PREFIX time: <http://www.w3.org/2006/time#> \n"
+ + "PREFIX tempo: <tag:rya-rdf.org,2015:temporal#> \n"
+ + "PREFIX geo: <http://www.opengis.net/ont/geosparql#>"
+ + "PREFIX geof: <http://www.opengis.net/def/function/geosparql/>"
+ + "SELECT ?event ?time ?point ?wkt "
+ + "WHERE { "
+ + " <urn:event-1111> time:atTime ?time . "
+ + " <urn:event-1111> geo:asWKT ?wkt . "
+ + " FILTER(geof:sfWithin(?wkt, \"POLYGON((-3 -2, -3 2, 1 2, 1 -2, -3 -2))\"^^geo:wktLiteral)) "
+ + " FILTER(tempo:equals(?time, \"" + temp.toString() + "\")) "
+ + "}";
+
+ final EventQueryNode node = buildNode(storage, query);
+ final CloseableIteration<BindingSet, QueryEvaluationException> rez = node.evaluate(new MapBindingSet());
+ final MapBindingSet expected = new MapBindingSet();
+ expected.addBinding("wkt", VF.createLiteral("POINT (1 1)"));
+ expected.addBinding("time", VF.createLiteral(temp.toString()));
+ int count = 0;
+ assertTrue(rez.hasNext());
+ while(rez.hasNext()) {
+ assertEquals(expected, rez.next());
+ count++;
+ }
+ assertEquals(1, count);
+ }
+
+ @Test
+ public void evaluate_variableSubject() throws Exception {
+ final MongoClient client = MockMongoFactory.newFactory().newMongoClient();
+ final EventStorage storage = new MongoEventStorage(client, "testDB");
+ RyaURI subject = new RyaURI("urn:event-1111");
+ Geometry geo = GF.createPoint(new Coordinate(1, 1));
+ final TemporalInstant temp = new TemporalInstantRfc3339(2015, 12, 30, 12, 00, 0);
+ final Event event = Event.builder()
+ .setSubject(subject)
+ .setGeometry(geo)
+ .setTemporalInstant(temp)
+ .build();
+
+ subject = new RyaURI("urn:event-2222");
+ geo = GF.createPoint(new Coordinate(-1, -1));
+ final Event otherEvent = Event.builder()
+ .setSubject(subject)
+ .setGeometry(geo)
+ .setTemporalInstant(temp)
+ .build();
+
+ storage.create(event);
+ storage.create(otherEvent);
+
+ final String query =
+ "PREFIX time: <http://www.w3.org/2006/time#> \n"
+ + "PREFIX tempo: <tag:rya-rdf.org,2015:temporal#> \n"
+ + "PREFIX geo: <http://www.opengis.net/ont/geosparql#>"
+ + "PREFIX geof: <http://www.opengis.net/def/function/geosparql/>"
+ + "SELECT ?event ?time ?point ?wkt "
+ + "WHERE { "
+ + " ?event time:atTime ?time . "
+ + " ?event geo:asWKT ?wkt . "
+ + " FILTER(geof:sfWithin(?wkt, \"POLYGON((-3 -2, -3 2, 1 2, 1 -2, -3 -2))\"^^geo:wktLiteral)) "
+ + " FILTER(tempo:equals(?time, \"2015-12-30T12:00:00Z\")) "
+ + "}";
+
+ final EventQueryNode node = buildNode(storage, query);
+ final CloseableIteration<BindingSet, QueryEvaluationException> rez = node.evaluate(new MapBindingSet());
+ final MapBindingSet expected1 = new MapBindingSet();
+ expected1.addBinding("wkt", VF.createLiteral("POINT (1 1)"));
+ expected1.addBinding("time", VF.createLiteral(new TemporalInstantRfc3339(2015, 12, 30, 12, 00, 0).toString()));
+ final MapBindingSet expected2 = new MapBindingSet();
+ expected2.addBinding("wkt", VF.createLiteral("POINT (-1 -1)"));
+ expected2.addBinding("time", VF.createLiteral(new TemporalInstantRfc3339(2015, 12, 30, 12, 00, 0).toString()));
+
+ final List<BindingSet> actual = new ArrayList<>();
+ while(rez.hasNext()) {
+ actual.add(rez.next());
+ }
+ assertEquals(expected1, actual.get(0));
+ assertEquals(expected2, actual.get(1));
+ assertEquals(2, actual.size());
+ }
+
+ @Test
+ public void evaluate_variableSubject_existingBindingset() throws Exception {
+ final MongoClient client = MockMongoFactory.newFactory().newMongoClient();
+ final EventStorage storage = new MongoEventStorage(client, "testDB");
+ RyaURI subject = new RyaURI("urn:event-1111");
+ Geometry geo = GF.createPoint(new Coordinate(1, 1));
+ final TemporalInstant temp = new TemporalInstantRfc3339(2015, 12, 30, 12, 00, 0);
+ final Event event = Event.builder()
+ .setSubject(subject)
+ .setGeometry(geo)
+ .setTemporalInstant(temp)
+ .build();
+
+ subject = new RyaURI("urn:event-2222");
+ geo = GF.createPoint(new Coordinate(-1, -1));
+ final Event otherEvent = Event.builder()
+ .setSubject(subject)
+ .setGeometry(geo)
+ .setTemporalInstant(temp)
+ .build();
+
+ storage.create(event);
+ storage.create(otherEvent);
+
+ final String query =
+ "PREFIX time: <http://www.w3.org/2006/time#> \n"
+ + "PREFIX tempo: <tag:rya-rdf.org,2015:temporal#> \n"
+ + "PREFIX geo: <http://www.opengis.net/ont/geosparql#>"
+ + "PREFIX geof: <http://www.opengis.net/def/function/geosparql/>"
+ + "SELECT ?event ?time ?point ?wkt "
+ + "WHERE { "
+ + " ?event time:atTime ?time . "
+ + " ?event geo:asWKT ?wkt . "
+ + " FILTER(geof:sfWithin(?wkt, \"POLYGON((-3 -2, -3 2, 1 2, 1 -2, -3 -2))\"^^geo:wktLiteral)) "
+ + " FILTER(tempo:equals(?time, \"2015-12-30T12:00:00Z\")) "
+ + "}";
+
+ final EventQueryNode node = buildNode(storage, query);
+ final MapBindingSet existingBindings = new MapBindingSet();
+ existingBindings.addBinding("event", VF.createURI("urn:event-2222"));
+ final CloseableIteration<BindingSet, QueryEvaluationException> rez = node.evaluate(existingBindings);
+ final MapBindingSet expected = new MapBindingSet();
+ expected.addBinding("wkt", VF.createLiteral("POINT (-1 -1)"));
+ expected.addBinding("time", VF.createLiteral(new TemporalInstantRfc3339(2015, 12, 30, 12, 00, 0).toString()));
+
+ final List<BindingSet> actual = new ArrayList<>();
+ while(rez.hasNext()) {
+ actual.add(rez.next());
+ }
+ assertEquals(1, actual.size());
+ assertEquals(expected, actual.get(0));
+ }
+
+ @Test
+ public void evaluate_variableSubject_existingBindingsetWrongFilters() throws Exception {
+ final MongoClient client = MockMongoFactory.newFactory().newMongoClient();
+ final EventStorage storage = new MongoEventStorage(client, "testDB");
+ RyaURI subject = new RyaURI("urn:event-1111");
+ Geometry geo = GF.createPoint(new Coordinate(1, 1));
+ final TemporalInstant temp = new TemporalInstantRfc3339(2015, 12, 30, 12, 00, 0);
+ final Event event = Event.builder()
+ .setSubject(subject)
+ .setGeometry(geo)
+ .setTemporalInstant(temp)
+ .build();
+
+ subject = new RyaURI("urn:event-2222");
+ geo = GF.createPoint(new Coordinate(-10, -10));
+ final Event otherEvent = Event.builder()
+ .setSubject(subject)
+ .setGeometry(geo)
+ .setTemporalInstant(temp)
+ .build();
+
+ storage.create(event);
+ storage.create(otherEvent);
+
+ final String query =
+ "PREFIX time: <http://www.w3.org/2006/time#> \n"
+ + "PREFIX tempo: <tag:rya-rdf.org,2015:temporal#> \n"
+ + "PREFIX geo: <http://www.opengis.net/ont/geosparql#>"
+ + "PREFIX geof: <http://www.opengis.net/def/function/geosparql/>"
+ + "SELECT ?event ?time ?point ?wkt "
+ + "WHERE { "
+ + " ?event time:atTime ?time . "
+ + " ?event geo:asWKT ?wkt . "
+ + " FILTER(geof:sfWithin(?wkt, \"POLYGON((-3 -2, -3 2, 1 2, 1 -2, -3 -2))\"^^geo:wktLiteral)) "
+ + " FILTER(tempo:equals(?time, \"2015-12-30T12:00:00Z\")) "
+ + "}";
+
+ final EventQueryNode node = buildNode(storage, query);
+ final MapBindingSet existingBindings = new MapBindingSet();
+ existingBindings.addBinding("event", VF.createURI("urn:event-2222"));
+ final CloseableIteration<BindingSet, QueryEvaluationException> rez = node.evaluate(existingBindings);
+ final MapBindingSet expected = new MapBindingSet();
+ expected.addBinding("wkt", VF.createLiteral("POINT (-1 -1)"));
+ expected.addBinding("time", VF.createLiteral(new TemporalInstantRfc3339(2015, 12, 30, 12, 00, 0).toString()));
+
+ assertFalse(rez.hasNext());
+ }
+
+ private EventQueryNode buildNode(final EventStorage store, final String query) throws Exception {
+ final List<IndexingExpr> geoFilters = new ArrayList<>();
+ final List<IndexingExpr> temporalFilters = new ArrayList<>();
+ final List<StatementPattern> sps = getSps(query);
+ final List<FunctionCall> filters = getFilters(query);
+ for(final FunctionCall filter : filters) {
+ final URI filterURI = new URIImpl(filter.getURI());
+ final Var objVar = IndexingFunctionRegistry.getResultVarFromFunctionCall(filterURI, filter.getArgs());
+ final IndexingExpr expr = new IndexingExpr(filterURI, sps.get(0), extractArguments(objVar.getName(), filter));
+ if(IndexingFunctionRegistry.getFunctionType(filterURI) == FUNCTION_TYPE.GEO) {
+ geoFilters.add(expr);
+ } else {
+ temporalFilters.add(expr);
+ }
+ }
+
+ final StatementPattern geoPattern = sps.get(1);
+ final StatementPattern temporalPattern = sps.get(0);
+
+ return new EventQueryNode.EventQueryNodeBuilder()
+ .setStorage(store)
+ .setGeoPattern(geoPattern)
+ .setTemporalPattern(temporalPattern)
+ .setGeoFilters(geoFilters)
+ .setTemporalFilters(temporalFilters)
+ .setUsedFilters(filters)
+ .build();
+ }
+
+ private Value[] extractArguments(final String matchName, final FunctionCall call) {
+ final Value args[] = new Value[call.getArgs().size() - 1];
+ int argI = 0;
+ for (int i = 0; i != call.getArgs().size(); ++i) {
+ final ValueExpr arg = call.getArgs().get(i);
+ if (argI == i && arg instanceof Var && matchName.equals(((Var)arg).getName())) {
+ continue;
+ }
+ if (arg instanceof ValueConstant) {
+ args[argI] = ((ValueConstant)arg).getValue();
+ } else if (arg instanceof Var && ((Var)arg).hasValue()) {
+ args[argI] = ((Var)arg).getValue();
+ } else {
+ throw new IllegalArgumentException("Query error: Found " + arg + ", expected a Literal, BNode or URI");
+ }
+ ++argI;
+ }
+ return args;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/646d21b4/extras/rya.geoindexing/src/test/java/org/apache/rya/indexing/geotemporal/mongo/EventDocumentConverterTest.java
----------------------------------------------------------------------
diff --git a/extras/rya.geoindexing/src/test/java/org/apache/rya/indexing/geotemporal/mongo/EventDocumentConverterTest.java b/extras/rya.geoindexing/src/test/java/org/apache/rya/indexing/geotemporal/mongo/EventDocumentConverterTest.java
new file mode 100644
index 0000000..3f2f9d5
--- /dev/null
+++ b/extras/rya.geoindexing/src/test/java/org/apache/rya/indexing/geotemporal/mongo/EventDocumentConverterTest.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.geotemporal.mongo;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.rya.api.domain.RyaURI;
+import org.apache.rya.indexing.TemporalInstant;
+import org.apache.rya.indexing.TemporalInstantRfc3339;
+import org.apache.rya.indexing.entity.storage.mongo.DocumentConverter.DocumentConverterException;
+import org.apache.rya.indexing.geotemporal.model.Event;
+import org.apache.rya.indexing.geotemporal.mongo.EventDocumentConverter;
+import org.bson.Document;
+import org.joda.time.DateTime;
+import org.junit.Test;
+
+import com.vividsolutions.jts.geom.Coordinate;
+import com.vividsolutions.jts.geom.Geometry;
+import com.vividsolutions.jts.geom.GeometryFactory;
+import com.vividsolutions.jts.geom.PrecisionModel;
+
+/**
+ * Tests the methods of {@link EventDocumentConverter}.
+ */
+public class EventDocumentConverterTest {
+ private static final GeometryFactory GF = new GeometryFactory(new PrecisionModel(), 4326);
+
+ @Test
+ public void to_and_from_document() throws DocumentConverterException {
+ final Geometry geo = GF.createPoint(new Coordinate(10, 10));
+ final TemporalInstant instant = new TemporalInstantRfc3339(DateTime.now());
+
+ // An Event that will be stored.
+ final Event event = Event.builder()
+ .setSubject(new RyaURI("urn:event/001"))
+ .setGeometry(geo)
+ .setTemporalInstant(instant)
+ .build();
+
+ final Document document = new EventDocumentConverter().toDocument(event);
+
+ // Convert the Document back into an Event.
+ final Event converted = new EventDocumentConverter().fromDocument(document);
+
+ // Ensure the original matches the round trip converted Event.
+ assertEquals(event, converted);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/646d21b4/extras/rya.geoindexing/src/test/java/org/apache/rya/indexing/geotemporal/mongo/GeoTemporalMongoDBStorageStrategyTest.java
----------------------------------------------------------------------
diff --git a/extras/rya.geoindexing/src/test/java/org/apache/rya/indexing/geotemporal/mongo/GeoTemporalMongoDBStorageStrategyTest.java b/extras/rya.geoindexing/src/test/java/org/apache/rya/indexing/geotemporal/mongo/GeoTemporalMongoDBStorageStrategyTest.java
new file mode 100644
index 0000000..edce1ec
--- /dev/null
+++ b/extras/rya.geoindexing/src/test/java/org/apache/rya/indexing/geotemporal/mongo/GeoTemporalMongoDBStorageStrategyTest.java
@@ -0,0 +1,469 @@
+/*
+l * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.geotemporal.mongo;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.rya.api.resolver.RdfToRyaConversions;
+import org.apache.rya.indexing.GeoConstants;
+import org.apache.rya.indexing.IndexingExpr;
+import org.apache.rya.indexing.IndexingFunctionRegistry;
+import org.apache.rya.indexing.IndexingFunctionRegistry.FUNCTION_TYPE;
+import org.apache.rya.indexing.geotemporal.GeoTemporalIndexer;
+import org.apache.rya.indexing.geotemporal.GeoTemporalTestBase;
+import org.apache.rya.indexing.geotemporal.GeoTemporalIndexer.GeoPolicy;
+import org.apache.rya.indexing.geotemporal.GeoTemporalIndexer.TemporalPolicy;
+import org.apache.rya.indexing.geotemporal.mongo.GeoTemporalMongoDBStorageStrategy;
+import org.junit.Before;
+import org.junit.Test;
+import org.openrdf.model.Resource;
+import org.openrdf.model.Statement;
+import org.openrdf.model.URI;
+import org.openrdf.model.Value;
+import org.openrdf.model.ValueFactory;
+import org.openrdf.model.impl.ContextStatementImpl;
+import org.openrdf.model.impl.URIImpl;
+import org.openrdf.model.impl.ValueFactoryImpl;
+import org.openrdf.query.algebra.FunctionCall;
+import org.openrdf.query.algebra.StatementPattern;
+import org.openrdf.query.algebra.ValueConstant;
+import org.openrdf.query.algebra.ValueExpr;
+import org.openrdf.query.algebra.Var;
+
+import com.mongodb.DBObject;
+import com.mongodb.util.JSON;
+
+/**
+ * Tests The {@link GeoTemporalMongoDBStorageStrategy}, which turns the filters
+ * into mongo {@link DBObject}s used to query.
+ *
+ * This tests also ensures all possible filter functions are accounted for in the test.
+ * @see TemporalPolicy Temporal Filter Functions
+ * @see GeoPolicy Geo Filter Functions
+ */
+public class GeoTemporalMongoDBStorageStrategyTest extends GeoTemporalTestBase {
+ private GeoTemporalMongoDBStorageStrategy adapter;
+ @Before
+ public void setup() {
+ adapter = new GeoTemporalMongoDBStorageStrategy();
+ }
+
+ @Test
+ public void emptyFilters_test() throws Exception {
+ final List<IndexingExpr> geoFilters = new ArrayList<>();
+ final List<IndexingExpr> temporalFilters = new ArrayList<>();
+ final DBObject actual = adapter.getFilterQuery(geoFilters, temporalFilters);
+ final String expectedString =
+ "{ }";
+ final DBObject expected = (DBObject) JSON.parse(expectedString);
+ assertEqualMongo(expected, actual);
+ }
+
+ @Test
+ public void equalsInstantAfterInterval_onlyOneGeo() throws Exception {
+ final String query =
+ "PREFIX geo: <http://www.opengis.net/ont/geosparql#>"
+ + "PREFIX geof: <http://www.opengis.net/def/function/geosparql/>"
+ + "SELECT ?point ?wkt "
+ + "WHERE { "
+ + " ?point geo:asWKT ?wkt . "
+ + " FILTER(geof:sfWithin(?wkt, \"POLYGON((-3 -2, -3 2, 1 2, 1 -2, -3 -2))\"^^geo:wktLiteral)) "
+ + "}";
+ final List<IndexingExpr> geoFilters = new ArrayList<>();
+ final List<StatementPattern> sps = getSps(query);
+ final List<FunctionCall> filters = getFilters(query);
+ for(final FunctionCall filter : filters) {
+ //should only be one.
+ final Var objVar = IndexingFunctionRegistry.getResultVarFromFunctionCall(new URIImpl(filter.getURI()), filter.getArgs());
+ final IndexingExpr expr = new IndexingExpr(new URIImpl(filter.getURI()), sps.get(0), extractArguments(objVar.getName(), filter));
+ geoFilters.add(expr);
+ }
+ final List<IndexingExpr> temporalFilters = new ArrayList<>();
+ final DBObject actual = adapter.getFilterQuery(geoFilters, temporalFilters);
+ final String expectedString =
+ "{ "
+ + "\"location\" : {"
+ + "\"$geoWithin\" : {"
+ + "\"$polygon\" : [ [ -3.0 , -2.0] , [ -3.0 , 2.0] , [ 1.0 , 2.0] , [ 1.0 , -2.0] , [ -3.0 , -2.0]]"
+ + "}"
+ + "}"
+ + "}";
+ final DBObject expected = (DBObject) JSON.parse(expectedString);
+ assertEqualMongo(expected, actual);
+ }
+
+ @Test
+ public void equalsInstantAfterInterval_onlyGeos() throws Exception {
+
+ /*
+ * TODO: change filter functions for coverage
+ */
+
+
+ final String query =
+ "PREFIX geo: <http://www.opengis.net/ont/geosparql#>"
+ + "PREFIX geof: <http://www.opengis.net/def/function/geosparql/>"
+ + "SELECT ?point ?wkt "
+ + "WHERE { "
+ + " ?point geo:asWKT ?wkt . "
+ + " FILTER(geof:sfIntersects(?wkt, \"POLYGON((-3 -2, -3 2, 1 2, 1 -2, -3 -2))\"^^geo:wktLiteral)) "
+ + " FILTER(geof:sfEquals(?wkt, \"POLYGON((-4 -3, -4 3, 2 3, 2 -3, -4 -3))\"^^geo:wktLiteral)) "
+ + "}";
+ final List<IndexingExpr> geoFilters = new ArrayList<>();
+ final List<StatementPattern> sps = getSps(query);
+ final List<FunctionCall> filters = getFilters(query);
+ for(final FunctionCall filter : filters) {
+ final Var objVar = IndexingFunctionRegistry.getResultVarFromFunctionCall(new URIImpl(filter.getURI()), filter.getArgs());
+ final IndexingExpr expr = new IndexingExpr(new URIImpl(filter.getURI()), sps.get(0), extractArguments(objVar.getName(), filter));
+ geoFilters.add(expr);
+ }
+ final List<IndexingExpr> temporalFilters = new ArrayList<>();
+ final DBObject actual = adapter.getFilterQuery(geoFilters, temporalFilters);
+ final String expectedString =
+ "{ "
+ + "\"$and\" : [{"
+ + "\"location\" : [ [ -4.0 , -3.0] , [ -4.0 , 3.0] , [ 2.0 , 3.0] , [ 2.0 , -3.0] , [ -4.0 , -3.0]]"
+ + "}, {"
+ + "\"location\" : {"
+ + "\"$geoIntersects\" : {"
+ + "\"$polygon\" : [ [ -3.0 , -2.0] , [ -3.0 , 2.0] , [ 1.0 , 2.0] , [ 1.0 , -2.0] , [ -3.0 , -2.0]]"
+ + "}"
+ + "}"
+ + "}]"
+ + "}";
+ final DBObject expected = (DBObject) JSON.parse(expectedString);
+ assertEqualMongo(expected, actual);
+ }
+
+ @Test
+ public void equalsInstantAfterInterval_onlyOneTemporal() throws Exception {
+ final String query =
+ "PREFIX time: <http://www.w3.org/2006/time#> \n"
+ + "PREFIX tempo: <tag:rya-rdf.org,2015:temporal#> \n"
+ + "SELECT ?event ?time "
+ + "WHERE { "
+ + " ?event time:atTime ?time . "
+ + " FILTER(tempo:equals(?time, \"2015-12-30T12:00:00Z\")) . "
+ + "}";
+ final List<IndexingExpr> geoFilters = new ArrayList<>();
+ final List<IndexingExpr> temporalFilters = new ArrayList<>();
+ final List<StatementPattern> sps = getSps(query);
+ final List<FunctionCall> filters = getFilters(query);
+ for(final FunctionCall filter : filters) {
+ //should only be one.
+ final Var objVar = IndexingFunctionRegistry.getResultVarFromFunctionCall(new URIImpl(filter.getURI()), filter.getArgs());
+ final IndexingExpr expr = new IndexingExpr(new URIImpl(filter.getURI()), sps.get(0), extractArguments(objVar.getName(), filter));
+ temporalFilters.add(expr);
+ }
+ final DBObject actual = adapter.getFilterQuery(geoFilters, temporalFilters);
+ final String expectedString =
+ "{ "
+ + "\"instant\" : {"
+ + "\"$date\" : \"2015-12-30T12:00:00.000Z\""
+ + "}"
+ + "}";
+ final DBObject expected = (DBObject) JSON.parse(expectedString);
+ assertEqualMongo(expected, actual);
+ }
+
+ @Test
+ public void equalsInstantAfterInterval_onlyTemporal() throws Exception {
+ final String query =
+ "PREFIX time: <http://www.w3.org/2006/time#> \n"
+ + "PREFIX tempo: <tag:rya-rdf.org,2015:temporal#> \n"
+ + "SELECT ?event ?time "
+ + "WHERE { "
+ + " ?event time:atTime ?time . "
+ + " FILTER(tempo:before(?time, \"2015-12-30T12:00:00Z\")) . "
+ + " FILTER(tempo:insideInterval(?time, \"[1969-12-31T19:00:00-05:00,1969-12-31T19:00:01-05:00]\")) . "
+ + "}";
+ final List<IndexingExpr> geoFilters = new ArrayList<>();
+ final List<IndexingExpr> temporalFilters = new ArrayList<>();
+ final List<StatementPattern> sps = getSps(query);
+ final List<FunctionCall> filters = getFilters(query);
+ for(final FunctionCall filter : filters) {
+ final Var objVar = IndexingFunctionRegistry.getResultVarFromFunctionCall(new URIImpl(filter.getURI()), filter.getArgs());
+ final IndexingExpr expr = new IndexingExpr(new URIImpl(filter.getURI()), sps.get(0), extractArguments(objVar.getName(), filter));
+ temporalFilters.add(expr);
+ }
+ final DBObject actual = adapter.getFilterQuery(geoFilters, temporalFilters);
+ final String expectedString =
+ "{ "
+ + "\"$and\" : [{"
+ + "\"instant\" : {"
+ + "\"$gt\" : {"
+ + "\"$date\" : \"1970-01-01T00:00:00.000Z\""
+ + "},"
+ + "\"$lt\" : {"
+ + "\"$date\" : \"1970-01-01T00:00:01.000Z\""
+ + "},"
+ + "}}, {"
+ + "\"instant\" : {"
+ + "\"$lt\" : {"
+ + "\"$date\" : \"2015-12-30T12:00:00.000Z\""
+ + "}"
+ + "}"
+ + "}]"
+ + "}";
+ final DBObject expected = (DBObject) JSON.parse(expectedString);
+ assertEqualMongo(expected, actual);
+ }
+
+ @Test
+ public void equalsInstantAfterInterval_GeoTemporalOneEach() throws Exception {
+ final String query =
+ "PREFIX time: <http://www.w3.org/2006/time#> \n"
+ + "PREFIX tempo: <tag:rya-rdf.org,2015:temporal#> \n"
+ + "PREFIX geo: <http://www.opengis.net/ont/geosparql#>"
+ + "PREFIX geof: <http://www.opengis.net/def/function/geosparql/>"
+ + "SELECT ?event ?time ?point ?wkt "
+ + "WHERE { "
+ + " ?event time:atTime ?time . "
+ + " ?point geo:asWKT ?wkt . "
+ + " FILTER(geof:sfWithin(?wkt, \"POLYGON((-3 -2, -3 2, 1 2, 1 -2, -3 -2))\"^^geo:wktLiteral)) "
+ + " FILTER(tempo:after(?time, \"2015-12-30T12:00:00Z\")) "
+ + "}";
+ final List<IndexingExpr> geoFilters = new ArrayList<>();
+ final List<IndexingExpr> temporalFilters = new ArrayList<>();
+ final List<StatementPattern> sps = getSps(query);
+ final List<FunctionCall> filters = getFilters(query);
+ for(final FunctionCall filter : filters) {
+ final URI filterURI = new URIImpl(filter.getURI());
+ final Var objVar = IndexingFunctionRegistry.getResultVarFromFunctionCall(filterURI, filter.getArgs());
+ final IndexingExpr expr = new IndexingExpr(filterURI, sps.get(0), extractArguments(objVar.getName(), filter));
+ if(IndexingFunctionRegistry.getFunctionType(filterURI) == FUNCTION_TYPE.GEO) {
+ geoFilters.add(expr);
+ } else {
+ temporalFilters.add(expr);
+ }
+ }
+ final DBObject actual = adapter.getFilterQuery(geoFilters, temporalFilters);
+ final String expectedString =
+ "{ "
+ + "\"$and\" : [{"
+ + "\"location\" : {"
+ + "\"$geoWithin\" : {"
+ + "\"$polygon\" : [ [ -3.0 , -2.0] , [ -3.0 , 2.0] , [ 1.0 , 2.0] , [ 1.0 , -2.0] , [ -3.0 , -2.0]]"
+ + "},"
+ + "}}, {"
+ + "\"instant\" : {"
+ + "\"$gt\" : {"
+ + "\"$date\" : \"2015-12-30T12:00:00.000Z\""
+ + "}"
+ + "}"
+ + "}]"
+ + "}";
+ final DBObject expected = (DBObject) JSON.parse(expectedString);
+ assertEqualMongo(expected, actual);
+ }
+
+ @Test
+ public void equalsInstantAfterInterval_GeoTemporalTwoEach() throws Exception {
+ final String query =
+ "PREFIX time: <http://www.w3.org/2006/time#> \n"
+ + "PREFIX tempo: <tag:rya-rdf.org,2015:temporal#> \n"
+ + "PREFIX geo: <http://www.opengis.net/ont/geosparql#>"
+ + "PREFIX geof: <http://www.opengis.net/def/function/geosparql/>"
+ + "SELECT ?event ?time ?point ?wkt "
+ + "WHERE { "
+ + " ?event time:atTime ?time . "
+ + " ?point geo:asWKT ?wkt . "
+ + " FILTER(geof:sfWithin(?wkt, \"POLYGON((-3 -2, -3 2, 1 2, 1 -2, -3 -2))\"^^geo:wktLiteral)) "
+ + " FILTER(geof:sfEquals(?wkt, \"POLYGON((-4 -3, -4 3, 2 3, 2 -3, -4 -3))\"^^geo:wktLiteral)) "
+ + " FILTER(tempo:hasEndInterval(?time, \"[1969-12-31T19:00:00-05:00,1969-12-31T19:00:01-05:00]\")) . "
+ + " FILTER(tempo:beforeInterval(?time, \"[1969-12-31T19:00:00-05:00,1969-12-31T19:00:01-05:00]\")) . "
+ + "}";
+ final List<IndexingExpr> geoFilters = new ArrayList<>();
+ final List<IndexingExpr> temporalFilters = new ArrayList<>();
+ final List<StatementPattern> sps = getSps(query);
+ final List<FunctionCall> filters = getFilters(query);
+ for(final FunctionCall filter : filters) {
+ final URI filterURI = new URIImpl(filter.getURI());
+ final Var objVar = IndexingFunctionRegistry.getResultVarFromFunctionCall(filterURI, filter.getArgs());
+ final IndexingExpr expr = new IndexingExpr(filterURI, sps.get(0), extractArguments(objVar.getName(), filter));
+ if(IndexingFunctionRegistry.getFunctionType(filterURI) == FUNCTION_TYPE.GEO) {
+ geoFilters.add(expr);
+ } else {
+ temporalFilters.add(expr);
+ }
+ }
+ final DBObject actual = adapter.getFilterQuery(geoFilters, temporalFilters);
+ final String expectedString =
+ "{ "
+ + "\"$and\" : [{"
+ + "\"$and\" : [{"
+ + "\"location\" : [ [ -4.0 , -3.0] , [ -4.0 , 3.0] , [ 2.0 , 3.0] , [ 2.0 , -3.0] , [ -4.0 , -3.0]]"
+ + "}, {"
+ + "\"location\" : {"
+ + "\"$geoWithin\" : {"
+ + "\"$polygon\" : [ [ -3.0 , -2.0] , [ -3.0 , 2.0] , [ 1.0 , 2.0] , [ 1.0 , -2.0] , [ -3.0 , -2.0]]"
+ + "}"
+ + "}"
+ + "}]"
+ + "},{"
+ + "\"$and\" : [{"
+ + "\"instant\" : {"
+ + "\"$lt\" : {"
+ + "\"$date\" : \"1970-01-01T00:00:00.000Z\""
+ + "},"
+ + "}"
+ + "}, {"
+ + "\"instant\" : {"
+ + "\"$date\" : \"1970-01-01T00:00:01.000Z\""
+ + "}"
+ + "}]"
+ + "}"
+ + "]"
+ + "}";
+ final DBObject expected = (DBObject) JSON.parse(expectedString);
+ assertEqualMongo(expected, actual);
+ }
+
+ @Test
+ public void equalsInstantAfterInterval_GeoTemporalSingleGeoTwoTemporal() throws Exception {
+ final String query =
+ "PREFIX time: <http://www.w3.org/2006/time#> \n"
+ + "PREFIX tempo: <tag:rya-rdf.org,2015:temporal#> \n"
+ + "PREFIX geo: <http://www.opengis.net/ont/geosparql#>"
+ + "PREFIX geof: <http://www.opengis.net/def/function/geosparql/>"
+ + "SELECT ?event ?time ?point ?wkt "
+ + "WHERE { "
+ + " ?event time:atTime ?time . "
+ + " ?point geo:asWKT ?wkt . "
+ + " FILTER(geof:sfEquals(?wkt, \"POLYGON((-4 -3, -4 3, 2 3, 2 -3, -4 -3))\"^^geo:wktLiteral)) ."
+ + " FILTER(tempo:hasBeginningInterval(?time, \"[1969-12-31T19:00:00-05:00,1969-12-31T19:00:01-05:00]\")) . "
+ + " FILTER(tempo:afterInterval(?time, \"[1969-12-31T19:00:00-05:00,1969-12-31T19:00:01-05:00]\"))"
+ + "}";
+ final List<IndexingExpr> geoFilters = new ArrayList<>();
+ final List<IndexingExpr> temporalFilters = new ArrayList<>();
+ final List<StatementPattern> sps = getSps(query);
+ final List<FunctionCall> filters = getFilters(query);
+ for(final FunctionCall filter : filters) {
+ final URI filterURI = new URIImpl(filter.getURI());
+ final Var objVar = IndexingFunctionRegistry.getResultVarFromFunctionCall(filterURI, filter.getArgs());
+ final IndexingExpr expr = new IndexingExpr(filterURI, sps.get(0), extractArguments(objVar.getName(), filter));
+ if(IndexingFunctionRegistry.getFunctionType(filterURI) == FUNCTION_TYPE.GEO) {
+ geoFilters.add(expr);
+ } else {
+ temporalFilters.add(expr);
+ }
+ }
+ final DBObject actual = adapter.getFilterQuery(geoFilters, temporalFilters);
+ final String expectedString =
+ "{ "
+ + "\"$and\" : [{"
+ + "\"location\" : [ [ -4.0 , -3.0] , [ -4.0 , 3.0] , [ 2.0 , 3.0] , [ 2.0 , -3.0] , [ -4.0 , -3.0]]"
+ + "},{"
+ + "\"$and\" : [{"
+ + "\"instant\" : {"
+ + "\"$gt\" : {"
+ + "\"$date\" : \"1970-01-01T00:00:01.000Z\""
+ + "},"
+ + "}"
+ + "}, {"
+ + "\"instant\" : {"
+ + "\"$date\" : \"1970-01-01T00:00:00.000Z\""
+ + "}"
+ + "}]"
+ + "}"
+ + "]"
+ + "}";
+ final DBObject expected = (DBObject) JSON.parse(expectedString);
+ assertEqualMongo(expected, actual);
+ }
+
+ @Test
+ public void serializeTest() {
+ final ValueFactory vf = new ValueFactoryImpl();
+ final Resource subject = vf.createURI("foo:subj");
+ final Resource context = vf.createURI("foo:context");
+
+ //GEO
+ URI predicate = GeoConstants.GEO_AS_WKT;
+ Value object = vf.createLiteral("Point(-77.03524 38.889468)", GeoConstants.XMLSCHEMA_OGC_WKT);
+
+ Statement statement = new ContextStatementImpl(subject, predicate, object, context);
+ DBObject actual = adapter.serialize(RdfToRyaConversions.convertStatement(statement));
+ String expectedString =
+ "{"
+ +"_id : -852305321, "
+ +"location : [ [ -77.03524 , 38.889468]]"
+ + "}";
+ DBObject expected = (DBObject) JSON.parse(expectedString);
+ assertEqualMongo(expected, actual);
+
+ //TIME INSTANT
+ predicate = new URIImpl("Property:event:time");
+ object = vf.createLiteral("2015-12-30T12:00:00Z");
+ statement = new ContextStatementImpl(subject, predicate, object, context);
+ actual = adapter.serialize(RdfToRyaConversions.convertStatement(statement));
+ expectedString =
+ "{"
+ +"_id : -852305321, "
+ +"time: {"
+ + "instant : {"
+ +"\"$date\" : \"2015-12-30T12:00:00.000Z\""
+ + "}"
+ + "}"
+ + "}";
+ expected = (DBObject) JSON.parse(expectedString);
+ assertEqualMongo(expected, actual);
+
+ //TIME INTERVAL
+ predicate = new URIImpl("Property:circa");
+ object = vf.createLiteral("[1969-12-31T19:00:00-05:00,1969-12-31T19:00:01-05:00]");
+ statement = new ContextStatementImpl(subject, predicate, object, context);
+ actual = adapter.serialize(RdfToRyaConversions.convertStatement(statement));
+ expectedString =
+ "{"
+ +"_id : -852305321, "
+ +"time: {"
+ + "start : {"
+ +"\"$date\" : \"1970-01-01T00:00:00.000Z\""
+ + "},"
+ + "end : {"
+ +"\"$date\" : \"1970-01-01T00:00:01.000Z\""
+ + "}"
+ + "}"
+ + "}";
+ expected = (DBObject) JSON.parse(expectedString);
+ assertEqualMongo(expected, actual);
+ }
+
+ private Value[] extractArguments(final String matchName, final FunctionCall call) {
+ final Value args[] = new Value[call.getArgs().size() - 1];
+ int argI = 0;
+ for (int i = 0; i != call.getArgs().size(); ++i) {
+ final ValueExpr arg = call.getArgs().get(i);
+ if (argI == i && arg instanceof Var && matchName.equals(((Var)arg).getName())) {
+ continue;
+ }
+ if (arg instanceof ValueConstant) {
+ args[argI] = ((ValueConstant)arg).getValue();
+ } else if (arg instanceof Var && ((Var)arg).hasValue()) {
+ args[argI] = ((Var)arg).getValue();
+ } else {
+ throw new IllegalArgumentException("Query error: Found " + arg + ", expected a Literal, BNode or URI");
+ }
+ ++argI;
+ }
+ return args;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/646d21b4/extras/rya.geoindexing/src/test/java/org/apache/rya/indexing/geotemporal/mongo/MongoEventStorageTest.java
----------------------------------------------------------------------
diff --git a/extras/rya.geoindexing/src/test/java/org/apache/rya/indexing/geotemporal/mongo/MongoEventStorageTest.java b/extras/rya.geoindexing/src/test/java/org/apache/rya/indexing/geotemporal/mongo/MongoEventStorageTest.java
new file mode 100644
index 0000000..5b07460
--- /dev/null
+++ b/extras/rya.geoindexing/src/test/java/org/apache/rya/indexing/geotemporal/mongo/MongoEventStorageTest.java
@@ -0,0 +1,197 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.geotemporal.mongo;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Optional;
+
+import org.apache.rya.api.domain.RyaURI;
+import org.apache.rya.indexing.TemporalInstant;
+import org.apache.rya.indexing.TemporalInstantRfc3339;
+import org.apache.rya.indexing.geotemporal.model.Event;
+import org.apache.rya.indexing.geotemporal.storage.EventStorage;
+import org.apache.rya.indexing.geotemporal.storage.EventStorage.EventAlreadyExistsException;
+import org.apache.rya.indexing.geotemporal.storage.EventStorage.EventStorageException;
+import org.joda.time.DateTime;
+import org.junit.Test;
+
+import com.vividsolutions.jts.geom.Coordinate;
+import com.vividsolutions.jts.geom.Geometry;
+import com.vividsolutions.jts.geom.GeometryFactory;
+import com.vividsolutions.jts.geom.PrecisionModel;
+
+/**
+ * Integration tests the methods of {@link MongoEventStorage}.
+ */
+public class MongoEventStorageTest extends MongoITBase {
+
+ private static final String RYA_INSTANCE_NAME = "testInstance";
+ private static final GeometryFactory GF = new GeometryFactory(new PrecisionModel(), 4326);
+
+ @Test
+ public void create_and_get() throws Exception {
+ final Geometry geo = GF.createPoint(new Coordinate(10, 10));
+ final TemporalInstant instant = new TemporalInstantRfc3339(DateTime.now());
+
+ // An Event that will be stored.
+ final Event event = Event.builder()
+ .setSubject(new RyaURI("urn:event/001"))
+ .setGeometry(geo)
+ .setTemporalInstant(instant)
+ .build();
+
+ // Create it.
+ final EventStorage storage = new MongoEventStorage(super.getMongoClient(), RYA_INSTANCE_NAME);
+ storage.create(event);
+
+ // Get it.
+ final Optional<Event> storedEvent = storage.get(new RyaURI("urn:event/001"));
+
+ // Verify the correct value was returned.
+ assertEquals(event, storedEvent.get());
+ }
+
+ @Test
+ public void can_not_create_with_same_subject() throws Exception {
+ final Geometry geo = GF.createPoint(new Coordinate(10, 10));
+ final TemporalInstant instant = new TemporalInstantRfc3339(DateTime.now());
+
+ // An Event that will be stored.
+ final Event event = Event.builder()
+ .setSubject(new RyaURI("urn:event/001"))
+ .setGeometry(geo)
+ .setTemporalInstant(instant)
+ .build();
+
+ // Create it.
+ final EventStorage storage = new MongoEventStorage(super.getMongoClient(), RYA_INSTANCE_NAME);
+ storage.create(event);
+
+ // Try to create it again. This will fail.
+ boolean failed = false;
+ try {
+ storage.create(event);
+ } catch(final EventAlreadyExistsException e) {
+ failed = true;
+ }
+ assertTrue(failed);
+ }
+
+ @Test
+ public void get_noneExisting() throws Exception {
+ // Get a Type that hasn't been created.
+ final EventStorage storage = new MongoEventStorage(super.getMongoClient(), RYA_INSTANCE_NAME);
+ final Optional<Event> storedEvent = storage.get(new RyaURI("urn:event/000"));
+
+ // Verify nothing was returned.
+ assertFalse(storedEvent.isPresent());
+ }
+
+ @Test
+ public void delete() throws Exception {
+ final Geometry geo = GF.createPoint(new Coordinate(10, 10));
+ final TemporalInstant instant = new TemporalInstantRfc3339(DateTime.now());
+
+ // An Event that will be stored.
+ final Event event = Event.builder()
+ .setSubject(new RyaURI("urn:event/002"))
+ .setGeometry(geo)
+ .setTemporalInstant(instant)
+ .build();
+
+ // Create it.
+ final EventStorage storage = new MongoEventStorage(super.getMongoClient(), RYA_INSTANCE_NAME);
+ storage.create(event);
+
+ // Delete it.
+ final boolean deleted = storage.delete( new RyaURI("urn:event/002") );
+
+ // Verify a document was deleted.
+ assertTrue( deleted );
+ }
+
+ @Test
+ public void delete_nonExisting() throws Exception {
+ // Delete an Event that has not been created.
+ final EventStorage storage = new MongoEventStorage(super.getMongoClient(), RYA_INSTANCE_NAME);
+ final boolean deleted = storage.delete( new RyaURI("urn:event/003") );
+
+ // Verify no document was deleted.
+ assertFalse( deleted );
+ }
+
+ @Test
+ public void update() throws Exception {
+ final EventStorage storage = new MongoEventStorage(super.getMongoClient(), RYA_INSTANCE_NAME);
+ final Geometry geo = GF.createPoint(new Coordinate(10, 10));
+ TemporalInstant instant = new TemporalInstantRfc3339(DateTime.now());
+
+ // An Event that will be stored.
+ final Event event = Event.builder()
+ .setSubject(new RyaURI("urn:event/004"))
+ .setGeometry(geo)
+ .setTemporalInstant(instant)
+ .build();
+
+ storage.create(event);
+
+ // Show Alice was stored.
+ Optional<Event> latest = storage.get(new RyaURI("urn:event/004"));
+ assertEquals(event, latest.get());
+
+ instant = new TemporalInstantRfc3339(DateTime.now());
+ // Change Alice's eye color to brown.
+ final Event updated = Event.builder(event)
+ .setTemporalInstant(instant)
+ .build();
+
+ storage.update(event, updated);
+
+ // Fetch the Alice object and ensure it has the new value.
+ latest = storage.get(new RyaURI("urn:event/004"));
+
+ assertEquals(updated, latest.get());
+ }
+
+ @Test(expected = EventStorageException.class)
+ public void update_differentSubjects() throws Exception {
+ final EventStorage storage = new MongoEventStorage(super.getMongoClient(), RYA_INSTANCE_NAME);
+ final Geometry geo = GF.createPoint(new Coordinate(10, 10));
+ final TemporalInstant instant = new TemporalInstantRfc3339(DateTime.now());
+
+ // Two objects that do not have the same Subjects.
+ final Event old = Event.builder()
+ .setSubject(new RyaURI("urn:event/001"))
+ .setGeometry(geo)
+ .setTemporalInstant(instant)
+ .build();
+
+ final Event updated = Event.builder()
+ .setSubject(new RyaURI("urn:event/002"))
+ .setGeometry(geo)
+ .setTemporalInstant(instant)
+ .build();
+
+ // The update will fail.
+ storage.update(old, updated);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/646d21b4/extras/rya.geoindexing/src/test/java/org/apache/rya/indexing/geotemporal/mongo/MongoGeoTemporalIndexerIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.geoindexing/src/test/java/org/apache/rya/indexing/geotemporal/mongo/MongoGeoTemporalIndexerIT.java b/extras/rya.geoindexing/src/test/java/org/apache/rya/indexing/geotemporal/mongo/MongoGeoTemporalIndexerIT.java
new file mode 100644
index 0000000..802e8c1
--- /dev/null
+++ b/extras/rya.geoindexing/src/test/java/org/apache/rya/indexing/geotemporal/mongo/MongoGeoTemporalIndexerIT.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.geotemporal.mongo;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Optional;
+
+import org.apache.rya.api.RdfCloudTripleStoreConfiguration;
+import org.apache.rya.api.domain.RyaStatement;
+import org.apache.rya.api.resolver.RdfToRyaConversions;
+import org.apache.rya.indexing.GeoConstants;
+import org.apache.rya.indexing.TemporalInstant;
+import org.apache.rya.indexing.geotemporal.model.Event;
+import org.apache.rya.indexing.geotemporal.mongo.MongoGeoTemporalIndexer;
+import org.apache.rya.indexing.geotemporal.storage.EventStorage;
+import org.apache.rya.mongodb.MockMongoFactory;
+import org.apache.rya.mongodb.MongoDBRdfConfiguration;
+import org.junit.Before;
+import org.junit.Test;
+import org.openrdf.model.Resource;
+import org.openrdf.model.URI;
+import org.openrdf.model.Value;
+import org.openrdf.model.ValueFactory;
+import org.openrdf.model.impl.StatementImpl;
+import org.openrdf.model.impl.ValueFactoryImpl;
+
+import com.mongodb.MongoClient;
+import com.vividsolutions.jts.geom.Geometry;
+
+/**
+ * Integration tests the methods of {@link MongoGeoTemporalIndexer}.
+ */
+public class MongoGeoTemporalIndexerIT extends MongoITBase {
+
+ private MongoGeoTemporalIndexer indexer;
+
+ @Before
+ public void makeTestIndexer() throws Exception {
+ final MongoClient client = MockMongoFactory.newFactory().newMongoClient();
+ indexer = new MongoGeoTemporalIndexer();
+ conf.setMongoDBName("GEO_TEMPORAL_INDEXER_TEST");
+ conf.set(MongoDBRdfConfiguration.MONGO_DB_NAME, "GEO_TEMPORAL_INDEXER_TEST");
+ conf.setMongoClient(client);
+ conf.set(RdfCloudTripleStoreConfiguration.CONF_TBL_PREFIX, "rya_");
+ indexer.setConf(conf);
+ indexer.init();
+ }
+
+ @Test
+ public void ensureEvent() throws Exception {
+ final RyaStatement geoStmnt = statement(point(0, 0));
+ final RyaStatement timeStmnt = statement(makeInstant(0));
+
+ final EventStorage store = indexer.getEventStorage(conf);
+
+ indexer.storeStatement(geoStmnt);
+ Optional<Event> evnt = store.get(geoStmnt.getSubject());
+ assertTrue(evnt.isPresent());
+ Event expected = Event.builder()
+ .setSubject(geoStmnt.getSubject())
+ .setGeometry(point(0, 0))
+ .build();
+ assertEquals(expected, evnt.get());
+
+ indexer.storeStatement(timeStmnt);
+ evnt = store.get(timeStmnt.getSubject());
+ assertTrue(evnt.isPresent());
+ expected = Event.builder()
+ .setSubject(geoStmnt.getSubject())
+ .setGeometry(point(0, 0))
+ .setTemporalInstant(makeInstant(0))
+ .build();
+ assertEquals(expected, evnt.get());
+
+ indexer.deleteStatement(geoStmnt);
+ evnt = store.get(timeStmnt.getSubject());
+ assertTrue(evnt.isPresent());
+ expected = Event.builder()
+ .setSubject(timeStmnt.getSubject())
+ .setTemporalInstant(makeInstant(0))
+ .build();
+ assertEquals(expected, evnt.get());
+
+ indexer.deleteStatement(timeStmnt);
+ evnt = store.get(timeStmnt.getSubject());
+ assertTrue(evnt.isPresent());
+ expected = Event.builder()
+ .setSubject(timeStmnt.getSubject())
+ .build();
+ assertEquals(expected, evnt.get());
+ }
+
+ private static RyaStatement statement(final Geometry geo) {
+ final ValueFactory vf = new ValueFactoryImpl();
+ final Resource subject = vf.createURI("uri:test");
+ final URI predicate = GeoConstants.GEO_AS_WKT;
+ final Value object = vf.createLiteral(geo.toString(), GeoConstants.XMLSCHEMA_OGC_WKT);
+ return RdfToRyaConversions.convertStatement(new StatementImpl(subject, predicate, object));
+ }
+
+ private static RyaStatement statement(final TemporalInstant instant) {
+ final ValueFactory vf = new ValueFactoryImpl();
+ final Resource subject = vf.createURI("uri:test");
+ final URI predicate = vf.createURI("Property:atTime");
+ final Value object = vf.createLiteral(instant.toString());
+ return RdfToRyaConversions.convertStatement(new StatementImpl(subject, predicate, object));
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/646d21b4/extras/rya.geoindexing/src/test/java/org/apache/rya/indexing/geotemporal/mongo/MongoITBase.java
----------------------------------------------------------------------
diff --git a/extras/rya.geoindexing/src/test/java/org/apache/rya/indexing/geotemporal/mongo/MongoITBase.java b/extras/rya.geoindexing/src/test/java/org/apache/rya/indexing/geotemporal/mongo/MongoITBase.java
new file mode 100644
index 0000000..bd7b5db
--- /dev/null
+++ b/extras/rya.geoindexing/src/test/java/org/apache/rya/indexing/geotemporal/mongo/MongoITBase.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.geotemporal.mongo;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.rya.indexing.geotemporal.GeoTemporalTestBase;
+import org.apache.rya.mongodb.MockMongoFactory;
+import org.apache.rya.mongodb.MongoConnectorFactory;
+import org.apache.rya.mongodb.MongoDBRdfConfiguration;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+
+import com.mongodb.MongoClient;
+
+/**
+ * A base class that may be used when implementing Mongo DB integration tests that
+ * use the JUnit framework.
+ */
+public class MongoITBase extends GeoTemporalTestBase {
+ protected final MongoDBRdfConfiguration conf = new MongoDBRdfConfiguration( new Configuration() );
+
+ private MongoClient mongoClient = null;
+ private Set<String> originalDbNames = null;
+
+ @Before
+ public void setupTest() throws Exception {
+ conf.setMongoDBName("testDB");
+ conf.setMongoInstance("testDB");
+ conf.setMongoPort("27017");
+
+ mongoClient = MockMongoFactory.newFactory().newMongoClient();
+ conf.setMongoClient(mongoClient);;
+ // Store the names of the DBs that are present before running the test.
+ originalDbNames = new HashSet<>();
+ for(final String name : mongoClient.listDatabaseNames()) {
+ originalDbNames.add(name);
+ }
+ }
+
+ @After
+ public void cleanupTest() {
+ // Remove any DBs that were created by the test.
+ for(final String dbName : mongoClient.listDatabaseNames()) {
+ if(!originalDbNames.contains(dbName)) {
+ mongoClient.dropDatabase(dbName);
+ }
+ }
+ }
+
+ @AfterClass
+ public static void shutdown() {
+ MongoConnectorFactory.closeMongoClient();
+ }
+
+ /**
+ * @return A {@link MongoClient} that is connected to the embedded instance of Mongo DB.
+ */
+ public MongoClient getMongoClient() {
+ return mongoClient;
+ }
+}
\ No newline at end of file
[3/5] incubator-rya git commit: RYA-236 Changes to other indexers
Posted by mi...@apache.org.
RYA-236 Changes to other indexers
The GeoTemporal indexer is very closely related to the Entity
Indexer. Abstracted out some common areas.
Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/440a4bfd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/440a4bfd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/440a4bfd
Branch: refs/heads/master
Commit: 440a4bfd37407c8e1937776e0ecd0460d646bd8e
Parents: 63095d4
Author: isper3at <sm...@gmail.com>
Authored: Thu Feb 23 14:51:25 2017 -0500
Committer: Aaron Mihalik <mi...@alum.mit.edu>
Committed: Wed Jun 14 13:28:09 2017 -0400
----------------------------------------------------------------------
.../apache/rya/indexing/TemporalTupleSet.java | 10 +-
.../rya/indexing/accumulo/ConfigUtils.java | 11 +-
.../temporal/AccumuloTemporalIndexer.java | 1 -
.../indexing/entity/EntityIndexException.java | 3 +-
.../indexing/entity/storage/EntityStorage.java | 44 +-----
.../storage/mongo/MongoEntityStorage.java | 6 +-
.../entity/update/BaseEntityIndexer.java | 9 +-
.../indexing/entity/update/EntityUpdater.java | 94 ++++---------
.../indexing/mongodb/AbstractMongoIndexer.java | 30 +++--
.../rya/indexing/mongodb/IndexingException.java | 53 ++++++++
.../TemporalMongoDBStorageStrategy.java | 41 +++---
.../mongodb/update/DocumentUpdater.java | 98 ++++++++++++++
.../mongodb/update/RyaObjectStorage.java | 135 +++++++++++++++++++
.../storage/mongo/MongoEntityStorageIT.java | 16 +--
.../rya/indexing/OptionalConfigUtils.java | 25 +++-
.../rya/indexing/accumulo/geo/GeoTupleSet.java | 23 ++--
.../mongodb/geo/GeoMongoDBStorageStrategy.java | 4 +-
17 files changed, 422 insertions(+), 181 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/440a4bfd/extras/indexing/src/main/java/org/apache/rya/indexing/TemporalTupleSet.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/org/apache/rya/indexing/TemporalTupleSet.java b/extras/indexing/src/main/java/org/apache/rya/indexing/TemporalTupleSet.java
index 1c5b72c..808afdf 100644
--- a/extras/indexing/src/main/java/org/apache/rya/indexing/TemporalTupleSet.java
+++ b/extras/indexing/src/main/java/org/apache/rya/indexing/TemporalTupleSet.java
@@ -4,6 +4,7 @@ import java.util.Map;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
+import org.apache.rya.indexing.external.tupleSet.ExternalTupleSet;
import org.joda.time.DateTime;
import org.openrdf.model.Statement;
import org.openrdf.model.URI;
@@ -35,7 +36,6 @@ import com.google.common.collect.Maps;
*/
import info.aduna.iteration.CloseableIteration;
-import org.apache.rya.indexing.external.tupleSet.ExternalTupleSet;
//Indexing Node for temporal expressions to be inserted into execution plan
//to delegate temporal portion of query to temporal index
@@ -111,7 +111,7 @@ public class TemporalTupleSet extends ExternalTupleSet {
public CloseableIteration<BindingSet, QueryEvaluationException> evaluate(final BindingSet bindings)
throws QueryEvaluationException {
final URI funcURI = filterInfo.getFunction();
- final SearchFunction searchFunction = (new TemporalSearchFunctionFactory(conf)).getSearchFunction(funcURI);
+ final SearchFunction searchFunction = new TemporalSearchFunctionFactory(conf, temporalIndexer).getSearchFunction(funcURI);
if(filterInfo.getArguments().length > 1) {
throw new IllegalArgumentException("Index functions do not support more than two arguments.");
@@ -123,12 +123,14 @@ public class TemporalTupleSet extends ExternalTupleSet {
//returns appropriate search function for a given URI
//search functions used by TemporalIndexer to query Temporal Index
- private class TemporalSearchFunctionFactory {
+ public static class TemporalSearchFunctionFactory {
private final Map<URI, SearchFunction> SEARCH_FUNCTION_MAP = Maps.newHashMap();
+ private final TemporalIndexer temporalIndexer;
Configuration conf;
- public TemporalSearchFunctionFactory(final Configuration conf) {
+ public TemporalSearchFunctionFactory(final Configuration conf, final TemporalIndexer temporalIndexer) {
this.conf = conf;
+ this.temporalIndexer = temporalIndexer;
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/440a4bfd/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/ConfigUtils.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/ConfigUtils.java b/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/ConfigUtils.java
index 41ae9ad..5cc1c44 100644
--- a/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/ConfigUtils.java
+++ b/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/ConfigUtils.java
@@ -124,6 +124,7 @@ public class ConfigUtils {
public static final String USE_PCJ_UPDATER_INDEX = "sc.use.updater";
public static final String FLUO_APP_NAME = "rya.indexing.pcj.fluo.fluoAppName";
+ public static final String USE_PCJ_FLUO_UPDATER = "rya.indexing.pcj.updater.fluo";
public static final String PCJ_STORAGE_TYPE = "rya.indexing.pcj.storageType";
public static final String PCJ_UPDATER_TYPE = "rya.indexing.pcj.updaterType";
@@ -427,6 +428,7 @@ public class ConfigUtils {
return conf.getBoolean(USE_PCJ_UPDATER_INDEX, false);
}
+
/**
* @return The name of the Fluo Application this instance of RYA is using to
* incrementally update PCJs.
@@ -436,10 +438,12 @@ public class ConfigUtils {
return Optional.fromNullable(conf.get(FLUO_APP_NAME));
}
+
public static boolean getUseMongo(final Configuration conf) {
return conf.getBoolean(USE_MONGO, false);
}
+
public static void setIndexers(final RdfCloudTripleStoreConfiguration conf) {
final List<String> indexList = Lists.newArrayList();
@@ -452,6 +456,7 @@ public class ConfigUtils {
indexList.add(MongoFreeTextIndexer.class.getName());
useFilterIndex = true;
}
+
if (getUseEntity(conf)) {
indexList.add(MongoEntityIndexer.class.getName());
optimizers.add(EntityIndexOptimizer.class.getName());
@@ -462,9 +467,9 @@ public class ConfigUtils {
useFilterIndex = true;
}
} else {
- if (getUsePCJ(conf) || getUseOptimalPCJ(conf)) {
- conf.setPcjOptimizer(PCJOptimizer.class);
- }
+ if (getUsePCJ(conf) || getUseOptimalPCJ(conf)) {
+ conf.setPcjOptimizer(PCJOptimizer.class);
+ }
if (getUsePcjUpdaterIndex(conf)) {
indexList.add(PrecomputedJoinIndexer.class.getName());
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/440a4bfd/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/temporal/AccumuloTemporalIndexer.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/temporal/AccumuloTemporalIndexer.java b/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/temporal/AccumuloTemporalIndexer.java
index e9d6c30..fcc1c58 100644
--- a/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/temporal/AccumuloTemporalIndexer.java
+++ b/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/temporal/AccumuloTemporalIndexer.java
@@ -378,7 +378,6 @@ public class AccumuloTemporalIndexer extends AbstractAccumuloIndexer implements
}
}
-
/**
* statements where the datetime is exactly the same as the queryInstant.
*/
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/440a4bfd/extras/indexing/src/main/java/org/apache/rya/indexing/entity/EntityIndexException.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/org/apache/rya/indexing/entity/EntityIndexException.java b/extras/indexing/src/main/java/org/apache/rya/indexing/entity/EntityIndexException.java
index 61efc91..1e6abdb 100644
--- a/extras/indexing/src/main/java/org/apache/rya/indexing/entity/EntityIndexException.java
+++ b/extras/indexing/src/main/java/org/apache/rya/indexing/entity/EntityIndexException.java
@@ -19,11 +19,12 @@
package org.apache.rya.indexing.entity;
import org.apache.rya.indexing.entity.model.TypedEntity;
+import org.apache.rya.indexing.mongodb.IndexingException;
/**
* An operation over the {@link TypedEntity} index failed to complete.
*/
-public class EntityIndexException extends Exception {
+public class EntityIndexException extends IndexingException {
private static final long serialVersionUID = 1L;
/**
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/440a4bfd/extras/indexing/src/main/java/org/apache/rya/indexing/entity/storage/EntityStorage.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/org/apache/rya/indexing/entity/storage/EntityStorage.java b/extras/indexing/src/main/java/org/apache/rya/indexing/entity/storage/EntityStorage.java
index 34dbf15..6f0b9ae 100644
--- a/extras/indexing/src/main/java/org/apache/rya/indexing/entity/storage/EntityStorage.java
+++ b/extras/indexing/src/main/java/org/apache/rya/indexing/entity/storage/EntityStorage.java
@@ -22,12 +22,12 @@ import java.util.Optional;
import java.util.Set;
import org.apache.rya.api.domain.RyaURI;
-import org.apache.rya.indexing.entity.EntityIndexException;
import org.apache.rya.indexing.entity.model.Entity;
import org.apache.rya.indexing.entity.model.Property;
import org.apache.rya.indexing.entity.model.Type;
import org.apache.rya.indexing.entity.model.TypedEntity;
import org.apache.rya.indexing.entity.storage.mongo.ConvertingCursor;
+import org.apache.rya.indexing.mongodb.update.RyaObjectStorage;
import org.calrissian.mango.collect.CloseableIterator;
import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
@@ -37,36 +37,7 @@ import edu.umd.cs.findbugs.annotations.NonNull;
* Stores and provides access to {@link Entity}s.
*/
@DefaultAnnotation(NonNull.class)
-public interface EntityStorage {
-
- /**
- * Creates a new {@link Entity} within the storage. The new Entity's subject must be unique.
- *
- * @param entity - The {@link Entity} to create. (not null)
- * @throws EntityAlreadyExistsException An {@link Entity} could not be created because one already exists for the Subject.
- * @throws EntityStorageException A problem occurred while creating the Entity.
- */
- public void create(Entity entity) throws EntityAlreadyExistsException, EntityStorageException;
-
- /**
- * Get an {@link Entity} from the storage by its subject.
- *
- * @param subject - Identifies which {@link Entity} to get. (not null)
- * @return The {@link Entity} if one exists for the subject.
- * @throws EntityStorageException A problem occurred while fetching the Entity from the storage.
- */
- public Optional<Entity> get(RyaURI subject) throws EntityStorageException;
-
- /**
- * Update the state of an {@link Entity}.
- *
- * @param old - The Entity the changes were applied to. (not null)
- * @param updated - The updated Entity to store. (not null)
- * @throws StaleUpdateException The {@code old} Entity does not match any Entities that are stored.
- * @throws EntityStorageException A problem occurred while updating the Entity within the storage.
- */
- public void update(Entity old, Entity updated) throws StaleUpdateException, EntityStorageException;
-
+public interface EntityStorage extends RyaObjectStorage<Entity> {
/**
* Search the stored {@link Entity}s that have a specific {@link Type} as
* well as the provided {@link Property} values.
@@ -80,18 +51,9 @@ public interface EntityStorage {
public ConvertingCursor<TypedEntity> search(final Optional<RyaURI> subject, Type type, Set<Property> properties) throws EntityStorageException;
/**
- * Deletes an {@link Entity} from the storage.
- *
- * @param subject -Identifies which {@link Entity} to delete. (not null)
- * @return {@code true} if something was deleted; otherwise {@code false}.
- * @throws EntityStorageException A problem occurred while deleting from the storage.
- */
- public boolean delete(RyaURI subject) throws EntityStorageException;
-
- /**
* Indicates a problem while interacting with an {@link EntityStorage}.
*/
- public static class EntityStorageException extends EntityIndexException {
+ public static class EntityStorageException extends ObjectStorageException {
private static final long serialVersionUID = 1L;
/**
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/440a4bfd/extras/indexing/src/main/java/org/apache/rya/indexing/entity/storage/mongo/MongoEntityStorage.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/org/apache/rya/indexing/entity/storage/mongo/MongoEntityStorage.java b/extras/indexing/src/main/java/org/apache/rya/indexing/entity/storage/mongo/MongoEntityStorage.java
index 1b4681d..a71d673 100644
--- a/extras/indexing/src/main/java/org/apache/rya/indexing/entity/storage/mongo/MongoEntityStorage.java
+++ b/extras/indexing/src/main/java/org/apache/rya/indexing/entity/storage/mongo/MongoEntityStorage.java
@@ -55,19 +55,19 @@ import edu.umd.cs.findbugs.annotations.NonNull;
@DefaultAnnotation(NonNull.class)
public class MongoEntityStorage implements EntityStorage {
- private static final String COLLECTION_NAME = "entity-entities";
+ protected static final String COLLECTION_NAME = "entity-entities";
private static final EntityDocumentConverter ENTITY_CONVERTER = new EntityDocumentConverter();
/**
* A client connected to the Mongo instance that hosts the Rya instance.
*/
- private final MongoClient mongo;
+ protected final MongoClient mongo;
/**
* The name of the Rya instance the {@link TypedEntity}s are for.
*/
- private final String ryaInstanceName;
+ protected final String ryaInstanceName;
/**
* Constructs an instance of {@link MongoEntityStorage}.
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/440a4bfd/extras/indexing/src/main/java/org/apache/rya/indexing/entity/update/BaseEntityIndexer.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/org/apache/rya/indexing/entity/update/BaseEntityIndexer.java b/extras/indexing/src/main/java/org/apache/rya/indexing/entity/update/BaseEntityIndexer.java
index 7da9918..84b0bdc 100644
--- a/extras/indexing/src/main/java/org/apache/rya/indexing/entity/update/BaseEntityIndexer.java
+++ b/extras/indexing/src/main/java/org/apache/rya/indexing/entity/update/BaseEntityIndexer.java
@@ -40,10 +40,10 @@ import org.apache.rya.indexing.entity.model.Entity;
import org.apache.rya.indexing.entity.model.Property;
import org.apache.rya.indexing.entity.model.Type;
import org.apache.rya.indexing.entity.storage.EntityStorage;
-import org.apache.rya.indexing.entity.storage.EntityStorage.EntityStorageException;
import org.apache.rya.indexing.entity.storage.TypeStorage;
import org.apache.rya.indexing.entity.storage.TypeStorage.TypeStorageException;
import org.apache.rya.indexing.entity.storage.mongo.ConvertingCursor;
+import org.apache.rya.indexing.mongodb.IndexingException;
import org.apache.rya.mongodb.MongoDBRdfConfiguration;
import org.apache.rya.mongodb.MongoSecondaryIndex;
import org.openrdf.model.URI;
@@ -98,7 +98,7 @@ public abstract class BaseEntityIndexer implements EntityIndexer, MongoSecondary
for(final Entry<RyaURI, List<RyaStatement>> entry : groupedBySubject.entrySet()) {
try {
updateEntity(entry.getKey(), entry.getValue());
- } catch (final EntityStorageException e) {
+ } catch (final IndexingException e) {
throw new IOException("Failed to update the Entity index.", e);
}
}
@@ -109,8 +109,9 @@ public abstract class BaseEntityIndexer implements EntityIndexer, MongoSecondary
*
* @param subject - The Subject of the {@link Entity} the statements are for. (not null)
* @param statements - Statements that the {@link Entity} will be updated with. (not null)
+ * @throws IndexingException
*/
- private void updateEntity(final RyaURI subject, final Collection<RyaStatement> statements) throws EntityStorageException {
+ private void updateEntity(final RyaURI subject, final Collection<RyaStatement> statements) throws IndexingException {
requireNonNull(subject);
requireNonNull(statements);
@@ -216,7 +217,7 @@ public abstract class BaseEntityIndexer implements EntityIndexer, MongoSecondary
return Optional.of( updated.build() );
});
- } catch (final EntityStorageException e) {
+ } catch (final IndexingException e) {
throw new IOException("Failed to update the Entity index.", e);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/440a4bfd/extras/indexing/src/main/java/org/apache/rya/indexing/entity/update/EntityUpdater.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/org/apache/rya/indexing/entity/update/EntityUpdater.java b/extras/indexing/src/main/java/org/apache/rya/indexing/entity/update/EntityUpdater.java
index fb5e957..2edbe37 100644
--- a/extras/indexing/src/main/java/org/apache/rya/indexing/entity/update/EntityUpdater.java
+++ b/extras/indexing/src/main/java/org/apache/rya/indexing/entity/update/EntityUpdater.java
@@ -21,14 +21,13 @@ package org.apache.rya.indexing.entity.update;
import static java.util.Objects.requireNonNull;
import java.util.Optional;
-import java.util.function.Function;
import org.apache.rya.api.domain.RyaURI;
import org.apache.rya.indexing.entity.model.Entity;
import org.apache.rya.indexing.entity.storage.EntityStorage;
-import org.apache.rya.indexing.entity.storage.EntityStorage.EntityAlreadyExistsException;
import org.apache.rya.indexing.entity.storage.EntityStorage.EntityStorageException;
-import org.apache.rya.indexing.entity.storage.EntityStorage.StaleUpdateException;
+import org.apache.rya.indexing.mongodb.update.DocumentUpdater;
+import org.apache.rya.indexing.mongodb.update.RyaObjectStorage.ObjectStorageException;
import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
import edu.umd.cs.findbugs.annotations.NonNull;
@@ -37,7 +36,7 @@ import edu.umd.cs.findbugs.annotations.NonNull;
* Performs update operations over an {@link EntityStorage}.
*/
@DefaultAnnotation(NonNull.class)
-public class EntityUpdater {
+public class EntityUpdater implements DocumentUpdater<RyaURI, Entity>{
private final EntityStorage storage;
@@ -50,73 +49,30 @@ public class EntityUpdater {
this.storage = requireNonNull(storage);
}
- /**
- * Tries to updates the state of an {@link Entity} until the update succeeds
- * or a non-recoverable exception is thrown.
- *
- * @param subject - The Subject of the {@link Entity} that will be updated. (not null)
- * @param mutator - Performs the mutation on the old state of the Entity and returns
- * the new state of the Entity. (not null)
- * @throws EntityStorageException A non-recoverable error has caused the update to fail.
- */
- public void update(final RyaURI subject, final EntityMutator mutator) throws EntityStorageException {
- requireNonNull(subject);
- requireNonNull(mutator);
-
- // Fetch the current state of the Entity.
- boolean completed = false;
- while(!completed) {
- try {
- final Optional<Entity> old = storage.get(subject);
- final Optional<Entity> updated = mutator.apply(old);
+ @Override
+ public void create(final Entity newObj) throws EntityStorageException {
+ try {
+ storage.create(newObj);
+ } catch (final ObjectStorageException e) {
+ throw new EntityStorageException(e.getMessage(), e);
+ }
+ }
- final boolean doWork = updated.isPresent();
- if(doWork) {
- if(!old.isPresent()) {
- storage.create(updated.get());
- } else {
- storage.update(old.get(), updated.get());
- }
- }
- completed = true;
- } catch(final EntityAlreadyExistsException | StaleUpdateException e) {
- // These are recoverable exceptions. Try again.
- } catch(final RuntimeException e) {
- throw new EntityStorageException("Failed to update Entity with Subject '" + subject.getData() + "'.", e);
- }
+ @Override
+ public void update(final Entity old, final Entity updated) throws EntityStorageException {
+ try {
+ storage.update(old, updated);
+ } catch (final ObjectStorageException e) {
+ throw new EntityStorageException(e.getMessage(), e);
}
}
- /**
- * Implementations of this interface are used to update the state of an
- * {@link Entity} in unison with a {@link EntityUpdater}.
- * </p>
- * This table describes what the updater will do depending on if an Entity
- * exists and if an updated Entity is returned.
- * </p>
- * <table border="1px">
- * <tr><th>Entity Provided</th><th>Update Returned</th><th>Effect</th></tr>
- * <tr>
- * <td>true</td>
- * <td>true</td>
- * <td>The old Entity will be updated using the returned state.</td>
- * </tr>
- * <tr>
- * <td>true</td>
- * <td>false</td>
- * <td>No work is performed.</td>
- * </tr>
- * <tr>
- * <td>false</td>
- * <td>true</td>
- * <td>A new Entity will be created using the returned state.</td>
- * </tr>
- * <tr>
- * <td>false</td>
- * <td>false</td>
- * <td>No work is performed.</td>
- * </tr>
- * </table>
- */
- public interface EntityMutator extends Function<Optional<Entity>, Optional<Entity>> { }
+ @Override
+ public Optional<Entity> getOld(final RyaURI key) throws EntityStorageException {
+ try {
+ return storage.get(key);
+ } catch (final ObjectStorageException e) {
+ throw new EntityStorageException(e.getMessage(), e);
+ }
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/440a4bfd/extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/AbstractMongoIndexer.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/AbstractMongoIndexer.java b/extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/AbstractMongoIndexer.java
index 56070b7..2428e28 100644
--- a/extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/AbstractMongoIndexer.java
+++ b/extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/AbstractMongoIndexer.java
@@ -25,11 +25,20 @@ import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.log4j.Logger;
+import org.apache.rya.api.domain.RyaStatement;
+import org.apache.rya.api.domain.RyaURI;
+import org.apache.rya.api.resolver.RyaToRdfConversions;
+import org.apache.rya.indexing.StatementConstraints;
+import org.apache.rya.mongodb.MongoConnectorFactory;
+import org.apache.rya.mongodb.MongoDBRdfConfiguration;
+import org.apache.rya.mongodb.MongoDBRyaDAO;
+import org.apache.rya.mongodb.MongoSecondaryIndex;
import org.openrdf.model.Literal;
import org.openrdf.model.Statement;
import org.openrdf.model.URI;
import org.openrdf.query.QueryEvaluationException;
+import com.google.common.annotations.VisibleForTesting;
import com.mongodb.DB;
import com.mongodb.DBCollection;
import com.mongodb.DBCursor;
@@ -37,16 +46,9 @@ import com.mongodb.DBObject;
import com.mongodb.MongoClient;
import com.mongodb.QueryBuilder;
import com.mongodb.ServerAddress;
+import com.mongodb.WriteConcern;
import info.aduna.iteration.CloseableIteration;
-import org.apache.rya.api.domain.RyaStatement;
-import org.apache.rya.api.domain.RyaURI;
-import org.apache.rya.api.resolver.RyaToRdfConversions;
-import org.apache.rya.indexing.StatementConstraints;
-import org.apache.rya.mongodb.MongoConnectorFactory;
-import org.apache.rya.mongodb.MongoDBRdfConfiguration;
-import org.apache.rya.mongodb.MongoDBRyaDAO;
-import org.apache.rya.mongodb.MongoSecondaryIndex;
/**
* Secondary Indexer using MondoDB
@@ -71,15 +73,16 @@ public abstract class AbstractMongoIndexer<T extends IndexingMongoDBStorageStrat
db = this.mongoClient.getDB(dbName);
collection = db.getCollection(conf.get(MongoDBRdfConfiguration.MONGO_COLLECTION_PREFIX, "rya") + getCollectionName());
}
-
+
@Override
- public void setClient(MongoClient client){
+ public void setClient(final MongoClient client){
this.mongoClient = client;
}
- // TODO this method is only intended to be used in testing
+ @VisibleForTesting
public void initIndexer(final Configuration conf, final MongoClient client) {
- ServerAddress address = client.getAddress();
+ setClient(client);
+ final ServerAddress address = client.getAddress();
conf.set(MongoDBRdfConfiguration.MONGO_INSTANCE, address.getHost());
conf.set(MongoDBRdfConfiguration.MONGO_INSTANCE_PORT, Integer.toString(address.getPort()));
setConf(conf);
@@ -144,8 +147,7 @@ public abstract class AbstractMongoIndexer<T extends IndexingMongoDBStorageStrat
if (isValidPredicate && (statement.getObject() instanceof Literal)) {
final DBObject obj = storageStrategy.serialize(ryaStatement);
if (obj != null) {
- final DBObject query = storageStrategy.serialize(ryaStatement);
- collection.update(query, obj, true, false);
+ collection.insert(obj, WriteConcern.ACKNOWLEDGED);
}
}
} catch (final IllegalArgumentException e) {
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/440a4bfd/extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/IndexingException.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/IndexingException.java b/extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/IndexingException.java
new file mode 100644
index 0000000..7029b45
--- /dev/null
+++ b/extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/IndexingException.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.mongodb;
+
+/**
+ * An indexing operation over mongoDB failed to complete.
+ */
+public class IndexingException extends Exception {
+ /**
+ * Constructs a new exception with the specified detail message. The
+ * cause is not initialized, and may subsequently be initialized by
+ * a call to {@link #initCause}.
+ *
+ * @param message the detail message. The detail message is saved for
+ * later retrieval by the {@link #getMessage()} method.
+ */
+ public IndexingException(final String message) {
+ super(message);
+ }
+
+ /**
+ * Constructs a new exception with the specified detail message and
+ * cause. <p>Note that the detail message associated with
+ * {@code cause} is <i>not</i> automatically incorporated in
+ * this exception's detail message.
+ *
+ * @param message the detail message (which is saved for later retrieval
+ * by the {@link #getMessage()} method).
+ * @param cause the cause (which is saved for later retrieval by the
+ * {@link #getCause()} method). (A <tt>null</tt> value is
+ * permitted, and indicates that the cause is nonexistent or
+ * unknown.)
+ */
+ public IndexingException(final String message, final Throwable cause) {
+ super(message, cause);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/440a4bfd/extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/temporal/TemporalMongoDBStorageStrategy.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/temporal/TemporalMongoDBStorageStrategy.java b/extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/temporal/TemporalMongoDBStorageStrategy.java
index eefcfb1..6beb6f1 100644
--- a/extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/temporal/TemporalMongoDBStorageStrategy.java
+++ b/extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/temporal/TemporalMongoDBStorageStrategy.java
@@ -21,20 +21,21 @@ package org.apache.rya.indexing.mongodb.temporal;
import java.util.regex.Matcher;
-import com.mongodb.BasicDBObject;
-import com.mongodb.DBCollection;
-import com.mongodb.DBObject;
-
import org.apache.rya.api.domain.RyaStatement;
import org.apache.rya.indexing.TemporalInstantRfc3339;
import org.apache.rya.indexing.TemporalInterval;
import org.apache.rya.indexing.mongodb.IndexingMongoDBStorageStrategy;
+import com.mongodb.BasicDBObject;
+import com.mongodb.BasicDBObjectBuilder;
+import com.mongodb.DBCollection;
+import com.mongodb.DBObject;
+
/**
* Defines how time based intervals/instants are stored in MongoDB.
* <p>
* Time can be stored as the following:
- * <p>
+ * <p>l
* <li><b>instant</b> {[statement], instant: TIME}</li>
* <li><b>interval</b> {[statement], start: TIME, end: TIME}</li>
* @see {@link TemporalInstantRfc3339} for how the dates are formatted.
@@ -53,16 +54,24 @@ public class TemporalMongoDBStorageStrategy extends IndexingMongoDBStorageStrate
@Override
public DBObject serialize(final RyaStatement ryaStatement) {
- final BasicDBObject base = (BasicDBObject) super.serialize(ryaStatement);
- final String objString = ryaStatement.getObject().getData();
- final Matcher match = TemporalInstantRfc3339.PATTERN.matcher(objString);
- if(match.find()) {
- final TemporalInterval date = TemporalInstantRfc3339.parseInterval(ryaStatement.getObject().getData());
- base.append(INTERVAL_START, date.getHasBeginning().getAsDateTime().toDate());
- base.append(INTERVAL_END, date.getHasEnd().getAsDateTime().toDate());
- } else {
- base.append(INSTANT, TemporalInstantRfc3339.FORMATTER.parseDateTime(objString).toDate());
- }
- return base;
+ final BasicDBObject base = (BasicDBObject) super.serialize(ryaStatement);
+ final DBObject time = getTimeValue(ryaStatement.getObject().getData());
+ base.putAll(time.toMap());
+ return base;
+ }
+
+ public DBObject getTimeValue(final String timeData) {
+ final Matcher match = TemporalInstantRfc3339.PATTERN.matcher(timeData);
+ final BasicDBObjectBuilder builder = BasicDBObjectBuilder.start();
+ if(match.find()) {
+ final TemporalInterval date = TemporalInstantRfc3339.parseInterval(timeData);
+ builder.add(INTERVAL_START, date.getHasBeginning().getAsDateTime().toDate());
+ builder.add(INTERVAL_END, date.getHasEnd().getAsDateTime().toDate());
+ } else {
+ builder.add(INSTANT, TemporalInstantRfc3339.FORMATTER.parseDateTime(timeData).toDate());
+ }
+ return builder.get();
}
+
+
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/440a4bfd/extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/update/DocumentUpdater.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/update/DocumentUpdater.java b/extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/update/DocumentUpdater.java
new file mode 100644
index 0000000..0b9db13
--- /dev/null
+++ b/extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/update/DocumentUpdater.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.mongodb.update;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Optional;
+import java.util.function.Function;
+
+import org.apache.rya.indexing.mongodb.IndexingException;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * Performs an update operation on a Document in mongodb.
+ * @param <T> - The key to find the object.
+ * @param <V> - The type of object to get updated.
+ */
+@DefaultAnnotation(NonNull.class)
+public interface DocumentUpdater<T, V> {
+ public default void update(final T key, final DocumentMutator<V> mutator) throws IndexingException {
+ requireNonNull(mutator);
+
+ // Fetch the current state of the Entity.
+ boolean completed = false;
+ while(!completed) {
+ //this cast is safe since the mutator interface is defined below to use Optional<V>
+ final Optional<V> old = getOld(key);
+ final Optional<V> updated = mutator.apply(old);
+
+ final boolean doWork = updated.isPresent();
+ if(doWork) {
+ if(!old.isPresent()) {
+ create(updated.get());
+ } else {
+ update(old.get(), updated.get());
+ }
+ }
+ completed = true;
+ }
+ }
+
+ Optional<V> getOld(T key) throws IndexingException;
+
+ void create(final V newObj) throws IndexingException;
+
+ void update(final V old, final V updated) throws IndexingException;
+
+ /**
+ * Implementations of this interface are used to update the state of a
+ * {@link DocumentUpdater#V} in unison with a {@link DocumentUpdater}.
+ * </p>
+ * This table describes what the updater will do depending on if the object
+ * exists and if an updated object is returned.
+ * </p>
+ * <table border="1px">
+ * <tr><th>Object Provided</th><th>Update Returned</th><th>Effect</th></tr>
+ * <tr>
+ * <td>true</td>
+ * <td>true</td>
+ * <td>The old Object will be updated using the returned state.</td>
+ * </tr>
+ * <tr>
+ * <td>true</td>
+ * <td>false</td>
+ * <td>No work is performed.</td>
+ * </tr>
+ * <tr>
+ * <td>false</td>
+ * <td>true</td>
+ * <td>A new Object will be created using the returned state.</td>
+ * </tr>
+ * <tr>
+ * <td>false</td>
+ * <td>false</td>
+ * <td>No work is performed.</td>
+ * </tr>
+ * </table>
+ */
+ public interface DocumentMutator<V> extends Function<Optional<V>, Optional<V>> { }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/440a4bfd/extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/update/RyaObjectStorage.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/update/RyaObjectStorage.java b/extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/update/RyaObjectStorage.java
new file mode 100644
index 0000000..10feb0d
--- /dev/null
+++ b/extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/update/RyaObjectStorage.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.mongodb.update;
+
+import java.util.Optional;
+
+import org.apache.rya.api.domain.RyaURI;
+import org.apache.rya.indexing.mongodb.IndexingException;
+
+/**
+ * Stores and provides access to objects of type T.
+ * @param <T> - The type of object to store/access.
+ */
+public interface RyaObjectStorage<T> {
+
+ /**
+ * Creates a new {@link RyaObjectStorage#T} within the storage. The new object's subject must be unique.
+ *
+ * @param obj - The {@link RyaObjectStorage#T} to create. (not null)
+ * @throws ObjectAlreadyExistsException An Object could not be created because one already exists for the Subject.
+ * @throws ObjectStorageException A problem occurred while creating the Object.
+ */
+ public void create(T doc) throws ObjectAlreadyExistsException, ObjectStorageException;
+
+ /**
+ * Get an Object from the storage by its subject.
+ *
+ * @param subject - Identifies which Object to get. (not null)
+ * @return The Object if one exists for the subject.
+ * @throws ObjectStorageException A problem occurred while fetching the Object from the storage.
+ */
+ public Optional<T> get(RyaURI subject) throws ObjectStorageException;
+
+ /**
+ * Update the state of an {@link RyaObjectStorage#T}.
+ *
+ * @param old - The Object the changes were applied to. (not null)
+ * @param updated - The updated Object to store. (not null)
+ * @throws StaleUpdateException The {@code old} Object does not match any that are stored.
+ * @throws ObjectStorageException A problem occurred while updating the Object within the storage.
+ */
+ public void update(T old, T updated) throws StaleUpdateException, ObjectStorageException;
+
+ /**
+ * Deletes an {@link RyaObjectStorage#T} from the storage.
+ *
+ * @param subject -Identifies which {@link RyaObjectStorage#T} to delete. (not null)
+ * @return {@code true} if something was deleted; otherwise {@code false}.
+ * @throws ObjectStorageException A problem occurred while deleting from the storage.
+ */
+ public boolean delete(RyaURI subject) throws ObjectStorageException;
+
+ /**
+ * Indicates a problem while interacting with an {@link RyaObjectStorage}.
+ */
+ public static class ObjectStorageException extends IndexingException {
+ private static final long serialVersionUID = 1L;
+
+ /**
+ * Constructs a new exception with the specified detail message. The
+ * cause is not initialized, and may subsequently be initialized by
+ * a call to {@link #initCause}.
+ *
+ * @param message the detail message. The detail message is saved for
+ * later retrieval by the {@link #getMessage()} method.
+ */
+ public ObjectStorageException(final String message) {
+ super(message);
+ }
+
+ /**
+ * Constructs a new exception with the specified detail message and
+ * cause. <p>Note that the detail message associated with
+ * {@code cause} is <i>not</i> automatically incorporated in
+ * this exception's detail message.
+ *
+ * @param message the detail message (which is saved for later retrieval
+ * by the {@link #getMessage()} method).
+ * @param cause the cause (which is saved for later retrieval by the
+ * {@link #getCause()} method). (A <tt>null</tt> value is
+ * permitted, and indicates that the cause is nonexistent or
+ * unknown.)
+ */
+ public ObjectStorageException(final String message, final Throwable cause) {
+ super(message, cause);
+ }
+ }
+
+ /**
+ * An {@link RyaObjectStorage#T} could not be created because one already exists for the Subject.
+ */
+ public static class ObjectAlreadyExistsException extends ObjectStorageException {
+ private static final long serialVersionUID = 1L;
+
+ public ObjectAlreadyExistsException(final String message) {
+ super(message);
+ }
+
+ public ObjectAlreadyExistsException(final String message, final Throwable cause) {
+ super(message, cause);
+ }
+ }
+
+ /**
+ * An object could not be updated because the old state does not
+ * match the current state.
+ */
+ public static class StaleUpdateException extends ObjectStorageException {
+ private static final long serialVersionUID = 1L;
+
+ public StaleUpdateException(final String message) {
+ super(message);
+ }
+
+ public StaleUpdateException(final String message, final Throwable cause) {
+ super(message, cause);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/440a4bfd/extras/indexing/src/test/java/org/apache/rya/indexing/entity/storage/mongo/MongoEntityStorageIT.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/test/java/org/apache/rya/indexing/entity/storage/mongo/MongoEntityStorageIT.java b/extras/indexing/src/test/java/org/apache/rya/indexing/entity/storage/mongo/MongoEntityStorageIT.java
index d271ba0..5d26bc0 100644
--- a/extras/indexing/src/test/java/org/apache/rya/indexing/entity/storage/mongo/MongoEntityStorageIT.java
+++ b/extras/indexing/src/test/java/org/apache/rya/indexing/entity/storage/mongo/MongoEntityStorageIT.java
@@ -50,7 +50,7 @@ public class MongoEntityStorageIT extends MongoITBase {
private static final String RYA_INSTANCE_NAME = "testInstance";
@Test
- public void create_and_get() throws EntityStorageException {
+ public void create_and_get() throws Exception {
// An Entity that will be stored.
final Entity entity = Entity.builder()
.setSubject(new RyaURI("urn:GTIN-14/00012345600012"))
@@ -71,7 +71,7 @@ public class MongoEntityStorageIT extends MongoITBase {
}
@Test
- public void can_not_create_with_same_subject() throws EntityStorageException {
+ public void can_not_create_with_same_subject() throws Exception {
// A Type that will be stored.
final Entity entity = Entity.builder()
.setSubject(new RyaURI("urn:GTIN-14/00012345600012"))
@@ -95,7 +95,7 @@ public class MongoEntityStorageIT extends MongoITBase {
}
@Test
- public void get_noneExisting() throws EntityStorageException {
+ public void get_noneExisting() throws Exception {
// Get a Type that hasn't been created.
final EntityStorage storage = new MongoEntityStorage(super.getMongoClient(), RYA_INSTANCE_NAME);
final Optional<Entity> storedEntity = storage.get(new RyaURI("urn:GTIN-14/00012345600012"));
@@ -105,7 +105,7 @@ public class MongoEntityStorageIT extends MongoITBase {
}
@Test
- public void delete() throws EntityStorageException {
+ public void delete() throws Exception {
// An Entity that will be stored.
final Entity entity = Entity.builder()
.setSubject(new RyaURI("urn:GTIN-14/00012345600012"))
@@ -126,7 +126,7 @@ public class MongoEntityStorageIT extends MongoITBase {
}
@Test
- public void delete_nonExisting() throws EntityStorageException {
+ public void delete_nonExisting() throws Exception {
// Delete an Entity that has not been created.
final EntityStorage storage = new MongoEntityStorage(super.getMongoClient(), RYA_INSTANCE_NAME);
final boolean deleted = storage.delete( new RyaURI("urn:GTIN-14/00012345600012") );
@@ -305,7 +305,7 @@ public class MongoEntityStorageIT extends MongoITBase {
}
@Test
- public void update() throws EntityStorageException {
+ public void update() throws Exception {
final EntityStorage storage = new MongoEntityStorage(super.getMongoClient(), RYA_INSTANCE_NAME);
// Store Alice in the repository.
@@ -338,7 +338,7 @@ public class MongoEntityStorageIT extends MongoITBase {
}
@Test(expected = StaleUpdateException.class)
- public void update_stale() throws EntityStorageException {
+ public void update_stale() throws Exception {
final EntityStorage storage = new MongoEntityStorage(super.getMongoClient(), RYA_INSTANCE_NAME);
// Store Alice in the repository.
@@ -370,7 +370,7 @@ public class MongoEntityStorageIT extends MongoITBase {
}
@Test(expected = EntityStorageException.class)
- public void update_differentSubjects() throws StaleUpdateException, EntityStorageException {
+ public void update_differentSubjects() throws Exception {
// Two objects that do not have the same Subjects.
final Entity old = Entity.builder()
.setSubject( new RyaURI("urn:SSN/111-11-1111") )
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/440a4bfd/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/OptionalConfigUtils.java
----------------------------------------------------------------------
diff --git a/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/OptionalConfigUtils.java b/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/OptionalConfigUtils.java
index dd6ea40..8d4486f 100644
--- a/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/OptionalConfigUtils.java
+++ b/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/OptionalConfigUtils.java
@@ -28,6 +28,8 @@ import org.apache.rya.api.RdfCloudTripleStoreConfiguration;
import org.apache.rya.api.instance.RyaDetails;
import org.apache.rya.indexing.accumulo.ConfigUtils;
import org.apache.rya.indexing.accumulo.geo.GeoMesaGeoIndexer;
+import org.apache.rya.indexing.geotemporal.GeoTemporalOptimizer;
+import org.apache.rya.indexing.geotemporal.mongo.MongoGeoTemporalIndexer;
import org.apache.rya.indexing.mongodb.geo.MongoGeoIndexer;
import org.openrdf.model.URI;
@@ -46,6 +48,7 @@ public class OptionalConfigUtils extends ConfigUtils {
public static final String GEO_NUM_PARTITIONS = "sc.geo.numPartitions";
public static final String USE_GEO = "sc.use_geo";
+ public static final String USE_GEOTEMPORAL = "sc.use_geotemporal";
public static final String USE_FREETEXT = "sc.use_freetext";
public static final String USE_TEMPORAL = "sc.use_temporal";
public static final String USE_ENTITY = "sc.use_entity";
@@ -67,6 +70,10 @@ public class OptionalConfigUtils extends ConfigUtils {
return conf.getBoolean(USE_GEO, false);
}
+ public static boolean getUseGeoTemporal(final Configuration conf) {
+ return conf.getBoolean(USE_GEOTEMPORAL, false);
+ }
+
/**
* Retrieves the value for the geo indexer type from the config.
* @param conf the {@link Configuration}.
@@ -83,11 +90,14 @@ public class OptionalConfigUtils extends ConfigUtils {
boolean useFilterIndex = false;
ConfigUtils.setIndexers(conf);
- for (final String index : conf.getStrings(AccumuloRdfConfiguration.CONF_ADDITIONAL_INDEXERS)){
- indexList.add(index);
- }
- for (final String optimizer : conf.getStrings(RdfCloudTripleStoreConfiguration.CONF_OPTIMIZERS)){
- optimizers.add(optimizer);
+ final String[] existingIndexers = conf.getStrings(AccumuloRdfConfiguration.CONF_ADDITIONAL_INDEXERS);
+ if(existingIndexers != null ) {
+ for (final String index : existingIndexers) {
+ indexList.add(index);
+ }
+ for (final String optimizer : conf.getStrings(RdfCloudTripleStoreConfiguration.CONF_OPTIMIZERS)){
+ optimizers.add(optimizer);
+ }
}
final GeoIndexerType geoIndexerType = getGeoIndexerType(conf);
@@ -102,6 +112,11 @@ public class OptionalConfigUtils extends ConfigUtils {
}
useFilterIndex = true;
}
+
+ if (getUseGeoTemporal(conf)) {
+ indexList.add(MongoGeoTemporalIndexer.class.getName());
+ optimizers.add(GeoTemporalOptimizer.class.getName());
+ }
} else {
if (getUseGeo(conf)) {
if (geoIndexerType == null) {
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/440a4bfd/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/accumulo/geo/GeoTupleSet.java
----------------------------------------------------------------------
diff --git a/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/accumulo/geo/GeoTupleSet.java b/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/accumulo/geo/GeoTupleSet.java
index f77e726..d00b849 100644
--- a/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/accumulo/geo/GeoTupleSet.java
+++ b/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/accumulo/geo/GeoTupleSet.java
@@ -4,6 +4,13 @@ import java.util.Map;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
+import org.apache.rya.indexing.GeoConstants;
+import org.apache.rya.indexing.GeoIndexer;
+import org.apache.rya.indexing.IndexingExpr;
+import org.apache.rya.indexing.IteratorFactory;
+import org.apache.rya.indexing.SearchFunction;
+import org.apache.rya.indexing.StatementConstraints;
+import org.apache.rya.indexing.external.tupleSet.ExternalTupleSet;
import org.openrdf.model.Statement;
import org.openrdf.model.URI;
import org.openrdf.query.BindingSet;
@@ -36,13 +43,6 @@ import com.vividsolutions.jts.io.WKTReader;
import info.aduna.iteration.CloseableIteration;
-import org.apache.rya.indexing.GeoConstants;
-import org.apache.rya.indexing.GeoIndexer;
-import org.apache.rya.indexing.IndexingExpr;
-import org.apache.rya.indexing.IteratorFactory;
-import org.apache.rya.indexing.SearchFunction;
-import org.apache.rya.indexing.StatementConstraints;
-import org.apache.rya.indexing.external.tupleSet.ExternalTupleSet;
//Indexing Node for geo expressions to be inserted into execution plan
//to delegate geo portion of query to geo index
@@ -116,7 +116,7 @@ public class GeoTupleSet extends ExternalTupleSet {
final URI funcURI = filterInfo.getFunction();
- final SearchFunction searchFunction = new GeoSearchFunctionFactory(conf).getSearchFunction(funcURI);
+ final SearchFunction searchFunction = new GeoSearchFunctionFactory(conf, geoIndexer).getSearchFunction(funcURI);
if(filterInfo.getArguments().length > 1) {
throw new IllegalArgumentException("Index functions do not support more than two arguments.");
}
@@ -130,14 +130,17 @@ public class GeoTupleSet extends ExternalTupleSet {
//returns appropriate search function for a given URI
//search functions used in GeoMesaGeoIndexer to access index
- public class GeoSearchFunctionFactory {
+ public static class GeoSearchFunctionFactory {
Configuration conf;
private final Map<URI, SearchFunction> SEARCH_FUNCTION_MAP = Maps.newHashMap();
- public GeoSearchFunctionFactory(final Configuration conf) {
+ private final GeoIndexer geoIndexer;
+
+ public GeoSearchFunctionFactory(final Configuration conf, final GeoIndexer geoIndexer) {
this.conf = conf;
+ this.geoIndexer = geoIndexer;
}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/440a4bfd/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/mongodb/geo/GeoMongoDBStorageStrategy.java
----------------------------------------------------------------------
diff --git a/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/mongodb/geo/GeoMongoDBStorageStrategy.java b/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/mongodb/geo/GeoMongoDBStorageStrategy.java
index 7069d73..8b2ebc3 100644
--- a/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/mongodb/geo/GeoMongoDBStorageStrategy.java
+++ b/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/mongodb/geo/GeoMongoDBStorageStrategy.java
@@ -65,7 +65,7 @@ public class GeoMongoDBStorageStrategy extends IndexingMongoDBStorageStrategy {
public abstract String getKeyword();
}
- static class GeoQuery {
+ public static class GeoQuery {
private final GeoQueryType queryType;
private final Geometry geo;
@@ -140,7 +140,7 @@ public class GeoMongoDBStorageStrategy extends IndexingMongoDBStorageStrategy {
}
}
- private List<double[]> getCorrespondingPoints(final Geometry geo){
+ public List<double[]> getCorrespondingPoints(final Geometry geo){
final List<double[]> points = new ArrayList<double[]>();
for (final Coordinate coord : geo.getCoordinates()){
points.add(new double[] {