You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rya.apache.org by mi...@apache.org on 2017/06/14 18:50:17 UTC
[5/5] incubator-rya git commit: RYA-239 GeoTemporal tests added
RYA-239 GeoTemporal tests added
Added tests and fixes that were needed while
writing tests. Closes #138
Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/646d21b4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/646d21b4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/646d21b4
Branch: refs/heads/master
Commit: 646d21b4e6ed9f7e010ffa8e75b2801f48456c1e
Parents: 440a4bf
Author: isper3at <sm...@gmail.com>
Authored: Fri Mar 31 13:38:30 2017 -0400
Committer: Aaron Mihalik <mi...@alum.mit.edu>
Committed: Wed Jun 14 14:42:45 2017 -0400
----------------------------------------------------------------------
.../apache/rya/indexing/TemporalTupleSet.java | 2 -
.../rya/indexing/accumulo/ConfigUtils.java.orig | 529 +++++++++++++++++++
.../indexing/entity/update/EntityUpdater.java | 4 +-
.../rya/indexing/mongodb/MongoDbSmartUri.java | 9 +-
.../mongodb/update/DocumentUpdater.java | 98 ----
.../mongodb/update/MongoDocumentUpdater.java | 98 ++++
.../mongodb/update/RyaObjectStorage.java | 1 +
.../GeoEnabledFilterFunctionOptimizer.java | 47 +-
.../indexing/accumulo/geo/GeoParseUtils.java | 75 ++-
.../GeoTemporalIndexSetProvider.java | 46 +-
.../geotemporal/GeoTemporalIndexer.java | 61 ++-
.../geotemporal/model/EventQueryNode.java | 121 ++++-
.../geotemporal/mongo/EventUpdater.java | 4 +-
.../GeoTemporalMongoDBStorageStrategy.java | 3 +-
.../geotemporal/mongo/MongoEventStorage.java | 5 +-
.../mongo/MongoGeoTemporalIndexer.java | 6 +-
.../geotemporal/GeoTemporalProviderTest.java | 222 ++++++++
.../geotemporal/GeoTemporalTestBase.java | 140 +++++
.../geotemporal/MongoGeoTemporalIndexIT.java | 174 ++++++
.../geotemporal/model/EventQueryNodeTest.java | 368 +++++++++++++
.../mongo/EventDocumentConverterTest.java | 64 +++
.../GeoTemporalMongoDBStorageStrategyTest.java | 469 ++++++++++++++++
.../mongo/MongoEventStorageTest.java | 197 +++++++
.../mongo/MongoGeoTemporalIndexerIT.java | 126 +++++
.../indexing/geotemporal/mongo/MongoITBase.java | 81 +++
25 files changed, 2730 insertions(+), 220 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/646d21b4/extras/indexing/src/main/java/org/apache/rya/indexing/TemporalTupleSet.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/org/apache/rya/indexing/TemporalTupleSet.java b/extras/indexing/src/main/java/org/apache/rya/indexing/TemporalTupleSet.java
index 808afdf..7cb4e6c 100644
--- a/extras/indexing/src/main/java/org/apache/rya/indexing/TemporalTupleSet.java
+++ b/extras/indexing/src/main/java/org/apache/rya/indexing/TemporalTupleSet.java
@@ -126,10 +126,8 @@ public class TemporalTupleSet extends ExternalTupleSet {
public static class TemporalSearchFunctionFactory {
private final Map<URI, SearchFunction> SEARCH_FUNCTION_MAP = Maps.newHashMap();
private final TemporalIndexer temporalIndexer;
- Configuration conf;
public TemporalSearchFunctionFactory(final Configuration conf, final TemporalIndexer temporalIndexer) {
- this.conf = conf;
this.temporalIndexer = temporalIndexer;
}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/646d21b4/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/ConfigUtils.java.orig
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/ConfigUtils.java.orig b/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/ConfigUtils.java.orig
new file mode 100644
index 0000000..9311200
--- /dev/null
+++ b/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/ConfigUtils.java.orig
@@ -0,0 +1,529 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.accumulo;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.BatchScanner;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.MultiTableBatchWriter;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableExistsException;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.ZooKeeperInstance;
+import org.apache.accumulo.core.client.admin.TableOperations;
+import org.apache.accumulo.core.client.mock.MockInstance;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.log4j.Logger;
+import org.apache.rya.accumulo.AccumuloRdfConfiguration;
+import org.apache.rya.accumulo.utils.ConnectorFactory;
+import org.apache.rya.api.RdfCloudTripleStoreConfiguration;
+import org.apache.rya.api.instance.RyaDetails;
+import org.apache.rya.indexing.FilterFunctionOptimizer;
+import org.apache.rya.indexing.accumulo.entity.EntityCentricIndex;
+import org.apache.rya.indexing.accumulo.entity.EntityOptimizer;
+import org.apache.rya.indexing.accumulo.freetext.AccumuloFreeTextIndexer;
+import org.apache.rya.indexing.accumulo.freetext.LuceneTokenizer;
+import org.apache.rya.indexing.accumulo.freetext.Tokenizer;
+import org.apache.rya.indexing.accumulo.temporal.AccumuloTemporalIndexer;
+import org.apache.rya.indexing.entity.EntityIndexOptimizer;
+import org.apache.rya.indexing.entity.update.mongo.MongoEntityIndexer;
+import org.apache.rya.indexing.external.PrecomputedJoinIndexer;
+import org.apache.rya.indexing.mongodb.freetext.MongoFreeTextIndexer;
+import org.apache.rya.indexing.mongodb.temporal.MongoTemporalIndexer;
+import org.apache.rya.indexing.pcj.matching.PCJOptimizer;
+import org.apache.rya.indexing.statement.metadata.matching.StatementMetadataOptimizer;
+import org.openrdf.model.URI;
+import org.openrdf.model.impl.URIImpl;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.Lists;
+
+/**
+ * A set of configuration utils to read a Hadoop {@link Configuration} object and create Cloudbase/Accumulo objects.
+ * Soon will deprecate this class. Use installer for the set methods, use {@link RyaDetails} for the get methods.
+ * New code must separate parameters that are set at Rya install time from that which is specific to the client.
+ * Also Accumulo index tables are pushed down to the implementation and not configured in conf.
+ */
+public class ConfigUtils {
+ private static final Logger logger = Logger.getLogger(ConfigUtils.class);
+
+ /**
+ * @Deprecated use {@link RdfCloudTripleStoreConfiguration#CONF_TBL_PREFIX} instead.
+ */
+ @Deprecated
+ public static final String CLOUDBASE_TBL_PREFIX = RdfCloudTripleStoreConfiguration.CONF_TBL_PREFIX;
+
+ /**
+ * @Deprecated use {@link AccumuloRdfConfiguration#CLOUDBASE_INSTANCE} instead.
+ */
+ @Deprecated
+ public static final String CLOUDBASE_INSTANCE = AccumuloRdfConfiguration.CLOUDBASE_INSTANCE;
+
+ /**
+ * @Deprecated use {@link AccumuloRdfConfiguration#CLOUDBASE_ZOOKEEPERS} instead.
+ */
+ @Deprecated
+ public static final String CLOUDBASE_ZOOKEEPERS = AccumuloRdfConfiguration.CLOUDBASE_ZOOKEEPERS;
+
+ /**
+ * @Deprecated use {@link AccumuloRdfConfiguration#CLOUDBASE_USER} instead.
+ */
+ @Deprecated
+ public static final String CLOUDBASE_USER = AccumuloRdfConfiguration.CLOUDBASE_USER;
+
+ /**
+ * @Deprecated use {@link AccumuloRdfConfiguration#CLOUDBASE_PASSWORD} instead.
+ */
+ @Deprecated
+ public static final String CLOUDBASE_PASSWORD = AccumuloRdfConfiguration.CLOUDBASE_PASSWORD;
+ /**
+ * @Deprecated use {@link RdfCloudTripleStoreConfiguration#CONF_QUERY_AUTH} instead.
+ */
+ @Deprecated
+ public static final String CLOUDBASE_AUTHS = RdfCloudTripleStoreConfiguration.CONF_QUERY_AUTH;
+
+ public static final String CLOUDBASE_WRITER_MAX_WRITE_THREADS = "sc.cloudbase.writer.maxwritethreads";
+ public static final String CLOUDBASE_WRITER_MAX_LATENCY = "sc.cloudbase.writer.maxlatency";
+ public static final String CLOUDBASE_WRITER_MAX_MEMORY = "sc.cloudbase.writer.maxmemory";
+
+ public static final String FREE_TEXT_QUERY_TERM_LIMIT = "sc.freetext.querytermlimit";
+
+ public static final String USE_FREETEXT = "sc.use_freetext";
+ public static final String USE_TEMPORAL = "sc.use_temporal";
+ public static final String USE_ENTITY = "sc.use_entity";
+ public static final String USE_PCJ = "sc.use_pcj";
+ public static final String USE_OPTIMAL_PCJ = "sc.use.optimal.pcj";
+ public static final String USE_PCJ_UPDATER_INDEX = "sc.use.updater";
+
+ public static final String FLUO_APP_NAME = "rya.indexing.pcj.fluo.fluoAppName";
+ public static final String USE_PCJ_FLUO_UPDATER = "rya.indexing.pcj.updater.fluo";
+ public static final String PCJ_STORAGE_TYPE = "rya.indexing.pcj.storageType";
+ public static final String PCJ_UPDATER_TYPE = "rya.indexing.pcj.updaterType";
+
+ public static final String USE_MOCK_INSTANCE = AccumuloRdfConfiguration.USE_MOCK_INSTANCE;
+
+ public static final String NUM_PARTITIONS = "sc.cloudbase.numPartitions";
+
+ private static final int WRITER_MAX_WRITE_THREADS = 1;
+ private static final long WRITER_MAX_LATNECY = Long.MAX_VALUE;
+ private static final long WRITER_MAX_MEMORY = 10000L;
+
+ public static final String DISPLAY_QUERY_PLAN = "query.printqueryplan";
+
+ public static final String FREETEXT_PREDICATES_LIST = "sc.freetext.predicates";
+ public static final String FREETEXT_DOC_NUM_PARTITIONS = "sc.freetext.numPartitions.text";
+ public static final String FREETEXT_TERM_NUM_PARTITIONS = "sc.freetext.numPartitions.term";
+
+ public static final String TOKENIZER_CLASS = "sc.freetext.tokenizer.class";
+
+ public static final String GEO_PREDICATES_LIST = "sc.geo.predicates";
+
+ public static final String TEMPORAL_PREDICATES_LIST = "sc.temporal.predicates";
+
+ public static final String USE_MONGO = "sc.useMongo";
+
+ public static boolean isDisplayQueryPlan(final Configuration conf) {
+ return conf.getBoolean(DISPLAY_QUERY_PLAN, false);
+ }
+
+ /**
+ * get a value from the configuration file and throw an exception if the
+ * value does not exist.
+ *
+ * @param conf
+ * @param key
+ * @return
+ */
+ private static String getStringCheckSet(final Configuration conf, final String key) {
+ final String value = conf.get(key);
+ requireNonNull(value, key + " not set");
+ return value;
+ }
+
+ /**
+ * @param conf
+ * @param tablename
+ * @return if the table was created
+ * @throws AccumuloException
+ * @throws AccumuloSecurityException
+ * @throws TableExistsException
+ */
+ public static boolean createTableIfNotExists(final Configuration conf, final String tablename)
+ throws AccumuloException, AccumuloSecurityException, TableExistsException {
+ final TableOperations tops = getConnector(conf).tableOperations();
+ if (!tops.exists(tablename)) {
+ logger.info("Creating table: " + tablename);
+ tops.create(tablename);
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * Lookup the table name prefix in the conf and throw an error if it is
+ * null. Future, get table prefix from RyaDetails -- the Rya instance name
+ * -- also getting info from the RyaDetails should happen within
+ * RyaSailFactory and not ConfigUtils.
+ *
+ * @param conf
+ * Rya configuration map where it extracts the prefix (instance
+ * name)
+ * @return index table prefix corresponding to this Rya instance
+ */
+ public static String getTablePrefix(final Configuration conf) {
+ final String tablePrefix;
+ tablePrefix = conf.get(RdfCloudTripleStoreConfiguration.CONF_TBL_PREFIX);
+ requireNonNull(tablePrefix,
+ "Configuration key: " + RdfCloudTripleStoreConfiguration.CONF_TBL_PREFIX + " not set. Cannot generate table name.");
+ return tablePrefix;
+ }
+
+ public static int getFreeTextTermLimit(final Configuration conf) {
+ return conf.getInt(FREE_TEXT_QUERY_TERM_LIMIT, 100);
+ }
+
+ public static Set<URI> getFreeTextPredicates(final Configuration conf) {
+ return getPredicates(conf, FREETEXT_PREDICATES_LIST);
+ }
+
+ public static Set<URI> getGeoPredicates(final Configuration conf) {
+ return getPredicates(conf, GEO_PREDICATES_LIST);
+ }
+
+ /**
+ * Used for indexing statements about date & time instances and intervals.
+ *
+ * @param conf
+ * @return Set of predicate URI's whose objects should be date time
+ * literals.
+ */
+ public static Set<URI> getTemporalPredicates(final Configuration conf) {
+ return getPredicates(conf, TEMPORAL_PREDICATES_LIST);
+ }
+
+ protected static Set<URI> getPredicates(final Configuration conf, final String confName) {
+ final String[] validPredicateStrings = conf.getStrings(confName, new String[] {});
+ final Set<URI> predicates = new HashSet<>();
+ for (final String prediateString : validPredicateStrings) {
+ predicates.add(new URIImpl(prediateString));
+ }
+ return predicates;
+ }
+
+ public static Tokenizer getFreeTextTokenizer(final Configuration conf) {
+ final Class<? extends Tokenizer> c = conf.getClass(TOKENIZER_CLASS, LuceneTokenizer.class, Tokenizer.class);
+ return ReflectionUtils.newInstance(c, conf);
+ }
+
+ public static BatchWriter createDefaultBatchWriter(final String tablename, final Configuration conf)
+ throws TableNotFoundException, AccumuloException, AccumuloSecurityException {
+ final Long DEFAULT_MAX_MEMORY = getWriterMaxMemory(conf);
+ final Long DEFAULT_MAX_LATENCY = getWriterMaxLatency(conf);
+ final Integer DEFAULT_MAX_WRITE_THREADS = getWriterMaxWriteThreads(conf);
+ final Connector connector = ConfigUtils.getConnector(conf);
+ return connector.createBatchWriter(tablename, DEFAULT_MAX_MEMORY, DEFAULT_MAX_LATENCY, DEFAULT_MAX_WRITE_THREADS);
+ }
+
+ public static MultiTableBatchWriter createMultitableBatchWriter(final Configuration conf)
+ throws AccumuloException, AccumuloSecurityException {
+ final Long DEFAULT_MAX_MEMORY = getWriterMaxMemory(conf);
+ final Long DEFAULT_MAX_LATENCY = getWriterMaxLatency(conf);
+ final Integer DEFAULT_MAX_WRITE_THREADS = getWriterMaxWriteThreads(conf);
+ final Connector connector = ConfigUtils.getConnector(conf);
+ return connector.createMultiTableBatchWriter(DEFAULT_MAX_MEMORY, DEFAULT_MAX_LATENCY, DEFAULT_MAX_WRITE_THREADS);
+ }
+
+ public static Scanner createScanner(final String tablename, final Configuration conf)
+ throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
+ final Connector connector = ConfigUtils.getConnector(conf);
+ final Authorizations auths = ConfigUtils.getAuthorizations(conf);
+ return connector.createScanner(tablename, auths);
+
+ }
+
+ public static BatchScanner createBatchScanner(final String tablename, final Configuration conf)
+ throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
+ final Connector connector = ConfigUtils.getConnector(conf);
+ final Authorizations auths = ConfigUtils.getAuthorizations(conf);
+ Integer numThreads = null;
+ if (conf instanceof RdfCloudTripleStoreConfiguration) {
+ numThreads = ((RdfCloudTripleStoreConfiguration) conf).getNumThreads();
+ } else {
+ numThreads = conf.getInt(RdfCloudTripleStoreConfiguration.CONF_NUM_THREADS, 2);
+ }
+ return connector.createBatchScanner(tablename, auths, numThreads);
+ }
+
+ public static int getWriterMaxWriteThreads(final Configuration conf) {
+ return conf.getInt(CLOUDBASE_WRITER_MAX_WRITE_THREADS, WRITER_MAX_WRITE_THREADS);
+ }
+
+ public static long getWriterMaxLatency(final Configuration conf) {
+ return conf.getLong(CLOUDBASE_WRITER_MAX_LATENCY, WRITER_MAX_LATNECY);
+ }
+
+ public static long getWriterMaxMemory(final Configuration conf) {
+ return conf.getLong(CLOUDBASE_WRITER_MAX_MEMORY, WRITER_MAX_MEMORY);
+ }
+
+ public static String getUsername(final JobContext job) {
+ return getUsername(job.getConfiguration());
+ }
+
+ /**
+ * Get the Accumulo username from the configuration object that is meant to
+ * be used when connecting a {@link Connector} to Accumulo.
+ *
+ * @param conf - The configuration object that will be interrogated. (not null)
+ * @return The username if one could be found; otherwise {@code null}.
+ */
+ public static String getUsername(final Configuration conf) {
+ return new AccumuloRdfConfiguration(conf).getUsername();
+ }
+
+ public static Authorizations getAuthorizations(final JobContext job) {
+ return getAuthorizations(job.getConfiguration());
+ }
+
+ public static Authorizations getAuthorizations(final Configuration conf) {
+ final String authString = conf.get(RdfCloudTripleStoreConfiguration.CONF_QUERY_AUTH, "");
+ if (authString.isEmpty()) {
+ return new Authorizations();
+ }
+ return new Authorizations(authString.split(","));
+ }
+
+ public static Instance getInstance(final JobContext job) {
+ return getInstance(job.getConfiguration());
+ }
+
+ /**
+ * Create an {@link Instance} that may be used to create {@link Connector}s
+ * to Accumulo. If the configuration has the {@link #USE_MOCK_INSTANCE} flag
+ * set, then the instance will be be a {@link MockInstance} instead of a
+ * Zookeeper backed instance.
+ *
+ * @param conf - The configuration object that will be interrogated. (not null)
+ * @return The {@link Instance} that may be used to connect to Accumulo.
+ */
+ public static Instance getInstance(final Configuration conf) {
+ // Pull out the Accumulo specific configuration values.
+ final AccumuloRdfConfiguration accConf = new AccumuloRdfConfiguration(conf);
+ String instanceName = accConf.getInstanceName();
+ String zoookeepers = accConf.getZookeepers();
+
+ // Create an Instance a mock if the mock flag is set.
+ if (useMockInstance(conf)) {
+ return new MockInstance(instanceName);
+ }
+
+ // Otherwise create an Instance to a Zookeeper managed instance of Accumulo.
+ return new ZooKeeperInstance(instanceName, zoookeepers);
+ }
+
+ public static String getPassword(final JobContext job) {
+ return getPassword(job.getConfiguration());
+ }
+
+ /**
+ * Get the Accumulo password from the configuration object that is meant to
+ * be used when connecting a {@link Connector} to Accumulo.
+ *
+ * @param conf - The configuration object that will be interrogated. (not null)
+ * @return The password if one could be found; otherwise an empty string.
+ */
+ public static String getPassword(final Configuration conf) {
+ return new AccumuloRdfConfiguration(conf).getPassword();
+ }
+
+ public static Connector getConnector(final JobContext job) throws AccumuloException, AccumuloSecurityException {
+ return getConnector(job.getConfiguration());
+ }
+
+ /**
+ * Create an Accumulo {@link Connector} using the configured connection information.
+ * If the connection information points to a mock instance of Accumulo, then the
+ * {@link #USE_MOCK_INSTANCE} flag must be set.
+ *
+ * @param conf - Configures how the connector will be built. (not null)
+ * @return A {@link Connector} that may be used to interact with the configured Accumulo instance.
+ * @throws AccumuloException The connector couldn't be created because of an Accumulo problem.
+ * @throws AccumuloSecurityException The connector couldn't be created because of an Accumulo security violation.
+ */
+ public static Connector getConnector(final Configuration conf) throws AccumuloException, AccumuloSecurityException {
+ return ConnectorFactory.connect( new AccumuloRdfConfiguration(conf) );
+ }
+
+ /**
+ * Indicates that a Mock instance of Accumulo is being used to back the Rya instance.
+ *
+ * @param conf - The configuration object that will be interrogated. (not null)
+ * @return {@code true} if the Rya instance is backed by a mock Accumulo; otherwise {@code false}.
+ */
+ public static boolean useMockInstance(final Configuration conf) {
+ return new AccumuloRdfConfiguration(conf).useMockInstance();
+ }
+
+ protected static int getNumPartitions(final Configuration conf) {
+ return conf.getInt(NUM_PARTITIONS, 25);
+ }
+
+ public static int getFreeTextDocNumPartitions(final Configuration conf) {
+ return conf.getInt(FREETEXT_DOC_NUM_PARTITIONS, getNumPartitions(conf));
+ }
+
+ public static int getFreeTextTermNumPartitions(final Configuration conf) {
+ return conf.getInt(FREETEXT_TERM_NUM_PARTITIONS, getNumPartitions(conf));
+ }
+
+ public static boolean getUseFreeText(final Configuration conf) {
+ return conf.getBoolean(USE_FREETEXT, false);
+ }
+
+ public static boolean getUseTemporal(final Configuration conf) {
+ return conf.getBoolean(USE_TEMPORAL, false);
+ }
+
+ public static boolean getUseEntity(final Configuration conf) {
+ return conf.getBoolean(USE_ENTITY, false);
+ }
+
+ public static boolean getUsePCJ(final Configuration conf) {
+ return conf.getBoolean(USE_PCJ, false);
+ }
+
+ public static boolean getUseOptimalPCJ(final Configuration conf) {
+ return conf.getBoolean(USE_OPTIMAL_PCJ, false);
+ }
+
+ public static boolean getUsePcjUpdaterIndex(final Configuration conf) {
+ return conf.getBoolean(USE_PCJ_UPDATER_INDEX, false);
+ }
+
+
+ /**
+ * @return The name of the Fluo Application this instance of RYA is using to
+ * incrementally update PCJs.
+ */
+ // TODO delete this eventually and use Details table
+ public Optional<String> getFluoAppName(final Configuration conf) {
+ return Optional.fromNullable(conf.get(FLUO_APP_NAME));
+ }
+
+
+ public static boolean getUseMongo(final Configuration conf) {
+ return conf.getBoolean(USE_MONGO, false);
+ }
+
+
+ public static void setIndexers(final RdfCloudTripleStoreConfiguration conf) {
+
+ final List<String> indexList = Lists.newArrayList();
+ final List<String> optimizers = Lists.newArrayList();
+
+ boolean useFilterIndex = false;
+
+ if (ConfigUtils.getUseMongo(conf)) {
+ if (getUseFreeText(conf)) {
+ indexList.add(MongoFreeTextIndexer.class.getName());
+ useFilterIndex = true;
+ }
+
+ if (getUseEntity(conf)) {
+ indexList.add(MongoEntityIndexer.class.getName());
+ optimizers.add(EntityIndexOptimizer.class.getName());
+ }
+
+ if (getUseTemporal(conf)) {
+ indexList.add(MongoTemporalIndexer.class.getName());
+ useFilterIndex = true;
+ }
+ } else {
+ if (getUsePCJ(conf) || getUseOptimalPCJ(conf)) {
+ conf.setPcjOptimizer(PCJOptimizer.class);
+ }
+
+ if (getUsePcjUpdaterIndex(conf)) {
+ indexList.add(PrecomputedJoinIndexer.class.getName());
+ }
+
+ if (getUseFreeText(conf)) {
+ indexList.add(AccumuloFreeTextIndexer.class.getName());
+ useFilterIndex = true;
+ }
+
+ if (getUseTemporal(conf)) {
+ indexList.add(AccumuloTemporalIndexer.class.getName());
+ useFilterIndex = true;
+ }
+
+ if (getUseEntity(conf)) {
+ indexList.add(EntityCentricIndex.class.getName());
+ optimizers.add(EntityOptimizer.class.getName());
+ }
+ }
+
+ if (useFilterIndex) {
+ optimizers.add(FilterFunctionOptimizer.class.getName());
+ }
+
+ if (conf.getUseStatementMetadata()) {
+ optimizers.add(StatementMetadataOptimizer.class.getName());
+ }
+
+<<<<<<< HEAD
+ conf.setStrings(AccumuloRdfConfiguration.CONF_ADDITIONAL_INDEXERS, indexList.toArray(new String[] {}));
+ conf.setStrings(RdfCloudTripleStoreConfiguration.CONF_OPTIMIZERS, optimizers.toArray(new String[] {}));
+ }
+}
+=======
+ final String[] existingIndexers = conf.getStrings(AccumuloRdfConfiguration.CONF_ADDITIONAL_INDEXERS);
+ if(existingIndexers != null ) {
+ for(final String idx : existingIndexers) {
+ indexList.add(idx);
+ }
+ }
+
+ final String[] existingOptimizers = conf.getStrings(RdfCloudTripleStoreConfiguration.CONF_OPTIMIZERS);
+ if(existingOptimizers != null ) {
+ for(final String opt : existingOptimizers) {
+ optimizers.add(opt);
+ }
+ }
+
+ conf.setStrings(AccumuloRdfConfiguration.CONF_ADDITIONAL_INDEXERS, indexList.toArray(new String[]{}));
+ conf.setStrings(RdfCloudTripleStoreConfiguration.CONF_OPTIMIZERS, optimizers.toArray(new String[]{}));
+ }
+
+
+
+}
+>>>>>>> RYA-236 Changes to other indexers
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/646d21b4/extras/indexing/src/main/java/org/apache/rya/indexing/entity/update/EntityUpdater.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/org/apache/rya/indexing/entity/update/EntityUpdater.java b/extras/indexing/src/main/java/org/apache/rya/indexing/entity/update/EntityUpdater.java
index 2edbe37..91f1146 100644
--- a/extras/indexing/src/main/java/org/apache/rya/indexing/entity/update/EntityUpdater.java
+++ b/extras/indexing/src/main/java/org/apache/rya/indexing/entity/update/EntityUpdater.java
@@ -26,7 +26,7 @@ import org.apache.rya.api.domain.RyaURI;
import org.apache.rya.indexing.entity.model.Entity;
import org.apache.rya.indexing.entity.storage.EntityStorage;
import org.apache.rya.indexing.entity.storage.EntityStorage.EntityStorageException;
-import org.apache.rya.indexing.mongodb.update.DocumentUpdater;
+import org.apache.rya.indexing.mongodb.update.MongoDocumentUpdater;
import org.apache.rya.indexing.mongodb.update.RyaObjectStorage.ObjectStorageException;
import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
@@ -36,7 +36,7 @@ import edu.umd.cs.findbugs.annotations.NonNull;
* Performs update operations over an {@link EntityStorage}.
*/
@DefaultAnnotation(NonNull.class)
-public class EntityUpdater implements DocumentUpdater<RyaURI, Entity>{
+public class EntityUpdater implements MongoDocumentUpdater<RyaURI, Entity>{
private final EntityStorage storage;
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/646d21b4/extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/MongoDbSmartUri.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/MongoDbSmartUri.java b/extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/MongoDbSmartUri.java
index 9fdfad6..cbc8796 100644
--- a/extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/MongoDbSmartUri.java
+++ b/extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/MongoDbSmartUri.java
@@ -36,6 +36,7 @@ import org.apache.rya.indexing.entity.storage.EntityStorage;
import org.apache.rya.indexing.entity.storage.EntityStorage.EntityStorageException;
import org.apache.rya.indexing.entity.storage.mongo.ConvertingCursor;
import org.apache.rya.indexing.entity.storage.mongo.MongoEntityStorage;
+import org.apache.rya.indexing.mongodb.update.RyaObjectStorage.ObjectStorageException;
import org.apache.rya.indexing.smarturi.SmartUriAdapter;
import org.apache.rya.indexing.smarturi.SmartUriException;
import org.apache.rya.indexing.smarturi.SmartUriStorage;
@@ -74,7 +75,7 @@ public class MongoDbSmartUri implements SmartUriStorage {
// Create it.
try {
entityStorage.create(entity);
- } catch (final EntityStorageException e) {
+ } catch (final ObjectStorageException e) {
throw new SmartUriException("Failed to create entity storage", e);
}
}
@@ -86,7 +87,7 @@ public class MongoDbSmartUri implements SmartUriStorage {
// Create it.
try {
entityStorage.create(entity);
- } catch (final EntityStorageException e) {
+ } catch (final ObjectStorageException e) {
throw new SmartUriException("Failed to create entity storage", e);
}
}
@@ -98,7 +99,7 @@ public class MongoDbSmartUri implements SmartUriStorage {
// Update it.
try {
entityStorage.update(oldEntity, updatedEntity);
- } catch (final EntityStorageException e) {
+ } catch (final ObjectStorageException e) {
throw new SmartUriException("Failed to update entity", e);
}
}
@@ -111,7 +112,7 @@ public class MongoDbSmartUri implements SmartUriStorage {
try {
final Optional<Entity> resultEntity = entityStorage.get(subject);
return resultEntity.get();
- } catch (final EntityStorageException e) {
+ } catch (final ObjectStorageException e) {
throw new SmartUriException("Failed to query entity storage", e);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/646d21b4/extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/update/DocumentUpdater.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/update/DocumentUpdater.java b/extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/update/DocumentUpdater.java
deleted file mode 100644
index 0b9db13..0000000
--- a/extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/update/DocumentUpdater.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.rya.indexing.mongodb.update;
-
-import static java.util.Objects.requireNonNull;
-
-import java.util.Optional;
-import java.util.function.Function;
-
-import org.apache.rya.indexing.mongodb.IndexingException;
-
-import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
-import edu.umd.cs.findbugs.annotations.NonNull;
-
-/**
- * Performs an update operation on a Document in mongodb.
- * @param <T> - The key to find the object.
- * @param <V> - The type of object to get updated.
- */
-@DefaultAnnotation(NonNull.class)
-public interface DocumentUpdater<T, V> {
- public default void update(final T key, final DocumentMutator<V> mutator) throws IndexingException {
- requireNonNull(mutator);
-
- // Fetch the current state of the Entity.
- boolean completed = false;
- while(!completed) {
- //this cast is safe since the mutator interface is defined below to use Optional<V>
- final Optional<V> old = getOld(key);
- final Optional<V> updated = mutator.apply(old);
-
- final boolean doWork = updated.isPresent();
- if(doWork) {
- if(!old.isPresent()) {
- create(updated.get());
- } else {
- update(old.get(), updated.get());
- }
- }
- completed = true;
- }
- }
-
- Optional<V> getOld(T key) throws IndexingException;
-
- void create(final V newObj) throws IndexingException;
-
- void update(final V old, final V updated) throws IndexingException;
-
- /**
- * Implementations of this interface are used to update the state of a
- * {@link DocumentUpdater#V} in unison with a {@link DocumentUpdater}.
- * </p>
- * This table describes what the updater will do depending on if the object
- * exists and if an updated object is returned.
- * </p>
- * <table border="1px">
- * <tr><th>Object Provided</th><th>Update Returned</th><th>Effect</th></tr>
- * <tr>
- * <td>true</td>
- * <td>true</td>
- * <td>The old Object will be updated using the returned state.</td>
- * </tr>
- * <tr>
- * <td>true</td>
- * <td>false</td>
- * <td>No work is performed.</td>
- * </tr>
- * <tr>
- * <td>false</td>
- * <td>true</td>
- * <td>A new Object will be created using the returned state.</td>
- * </tr>
- * <tr>
- * <td>false</td>
- * <td>false</td>
- * <td>No work is performed.</td>
- * </tr>
- * </table>
- */
- public interface DocumentMutator<V> extends Function<Optional<V>, Optional<V>> { }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/646d21b4/extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/update/MongoDocumentUpdater.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/update/MongoDocumentUpdater.java b/extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/update/MongoDocumentUpdater.java
new file mode 100644
index 0000000..a7a3eb9
--- /dev/null
+++ b/extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/update/MongoDocumentUpdater.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.mongodb.update;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Optional;
+import java.util.function.Function;
+
+import org.apache.rya.indexing.mongodb.IndexingException;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * Performs an update operation on a Document in mongodb.
+ * @param <T> - The key to find the object.
+ * @param <V> - The type of object to get updated.
+ */
+@DefaultAnnotation(NonNull.class)
+public interface MongoDocumentUpdater<T, V> {
+ public default void update(final T key, final DocumentMutator<V> mutator) throws IndexingException {
+ requireNonNull(mutator);
+
+ // Fetch the current state of the Entity.
+ boolean completed = false;
+ while(!completed) {
+ //this cast is safe since the mutator interface is defined below to use Optional<V>
+ final Optional<V> old = getOld(key);
+ final Optional<V> updated = mutator.apply(old);
+
+ final boolean doWork = updated.isPresent();
+ if(doWork) {
+ if(!old.isPresent()) {
+ create(updated.get());
+ } else {
+ update(old.get(), updated.get());
+ }
+ }
+ completed = true;
+ }
+ }
+
+ Optional<V> getOld(T key) throws IndexingException;
+
+ void create(final V newObj) throws IndexingException;
+
+ void update(final V old, final V updated) throws IndexingException;
+
+ /**
+ * Implementations of this interface are used to update the state of a
+ * {@link MongoDocumentUpdater#V} in unison with a {@link MongoDocumentUpdater}.
+ * </p>
+ * This table describes what the updater will do depending on if the object
+ * exists and if an updated object is returned.
+ * </p>
+ * <table border="1px">
+ * <tr><th>Object Provided</th><th>Update Returned</th><th>Effect</th></tr>
+ * <tr>
+ * <td>true</td>
+ * <td>true</td>
+ * <td>The old Object will be updated using the returned state.</td>
+ * </tr>
+ * <tr>
+ * <td>true</td>
+ * <td>false</td>
+ * <td>No work is performed.</td>
+ * </tr>
+ * <tr>
+ * <td>false</td>
+ * <td>true</td>
+ * <td>A new Object will be created using the returned state.</td>
+ * </tr>
+ * <tr>
+ * <td>false</td>
+ * <td>false</td>
+ * <td>No work is performed.</td>
+ * </tr>
+ * </table>
+ */
+ public interface DocumentMutator<V> extends Function<Optional<V>, Optional<V>> { }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/646d21b4/extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/update/RyaObjectStorage.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/update/RyaObjectStorage.java b/extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/update/RyaObjectStorage.java
index 10feb0d..bd04368 100644
--- a/extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/update/RyaObjectStorage.java
+++ b/extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/update/RyaObjectStorage.java
@@ -25,6 +25,7 @@ import org.apache.rya.indexing.mongodb.IndexingException;
/**
* Stores and provides access to objects of type T.
+ * The RyaURI subject is the primary storage key used.
* @param <T> - The type of object to store/access.
*/
public interface RyaObjectStorage<T> {
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/646d21b4/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/GeoEnabledFilterFunctionOptimizer.java
----------------------------------------------------------------------
diff --git a/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/GeoEnabledFilterFunctionOptimizer.java b/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/GeoEnabledFilterFunctionOptimizer.java
index d773831..bf6b632 100644
--- a/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/GeoEnabledFilterFunctionOptimizer.java
+++ b/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/GeoEnabledFilterFunctionOptimizer.java
@@ -34,6 +34,18 @@ import org.apache.commons.lang.Validate;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.log4j.Logger;
+import org.apache.rya.accumulo.AccumuloRdfConfiguration;
+import org.apache.rya.indexing.IndexingFunctionRegistry.FUNCTION_TYPE;
+import org.apache.rya.indexing.accumulo.ConfigUtils;
+import org.apache.rya.indexing.accumulo.freetext.AccumuloFreeTextIndexer;
+import org.apache.rya.indexing.accumulo.freetext.FreeTextTupleSet;
+import org.apache.rya.indexing.accumulo.geo.GeoMesaGeoIndexer;
+import org.apache.rya.indexing.accumulo.geo.GeoParseUtils;
+import org.apache.rya.indexing.accumulo.geo.GeoTupleSet;
+import org.apache.rya.indexing.accumulo.temporal.AccumuloTemporalIndexer;
+import org.apache.rya.indexing.mongodb.freetext.MongoFreeTextIndexer;
+import org.apache.rya.indexing.mongodb.geo.MongoGeoIndexer;
+import org.apache.rya.indexing.mongodb.temporal.MongoTemporalIndexer;
import org.geotools.feature.SchemaException;
import org.openrdf.model.Resource;
import org.openrdf.model.URI;
@@ -52,25 +64,12 @@ import org.openrdf.query.algebra.QueryModelNode;
import org.openrdf.query.algebra.StatementPattern;
import org.openrdf.query.algebra.TupleExpr;
import org.openrdf.query.algebra.ValueConstant;
-import org.openrdf.query.algebra.ValueExpr;
import org.openrdf.query.algebra.Var;
import org.openrdf.query.algebra.evaluation.QueryOptimizer;
import org.openrdf.query.algebra.helpers.QueryModelVisitorBase;
import com.google.common.collect.Lists;
-import org.apache.rya.accumulo.AccumuloRdfConfiguration;
-import org.apache.rya.indexing.IndexingFunctionRegistry.FUNCTION_TYPE;
-import org.apache.rya.indexing.accumulo.ConfigUtils;
-import org.apache.rya.indexing.accumulo.freetext.AccumuloFreeTextIndexer;
-import org.apache.rya.indexing.accumulo.freetext.FreeTextTupleSet;
-import org.apache.rya.indexing.accumulo.geo.GeoMesaGeoIndexer;
-import org.apache.rya.indexing.accumulo.geo.GeoTupleSet;
-import org.apache.rya.indexing.accumulo.temporal.AccumuloTemporalIndexer;
-import org.apache.rya.indexing.mongodb.freetext.MongoFreeTextIndexer;
-import org.apache.rya.indexing.mongodb.geo.MongoGeoIndexer;
-import org.apache.rya.indexing.mongodb.temporal.MongoTemporalIndexer;
-
public class GeoEnabledFilterFunctionOptimizer implements QueryOptimizer, Configurable {
private static final Logger LOG = Logger.getLogger(GeoEnabledFilterFunctionOptimizer.class);
private final ValueFactory valueFactory = new ValueFactoryImpl();
@@ -232,7 +231,7 @@ public class GeoEnabledFilterFunctionOptimizer implements QueryOptimizer, Config
final URI fnUri = valueFactory.createURI(call.getURI());
final Var resultVar = IndexingFunctionRegistry.getResultVarFromFunctionCall(fnUri, call.getArgs());
if (resultVar != null && resultVar.getName().equals(matchVar)) {
- addFilter(valueFactory.createURI(call.getURI()), extractArguments(matchVar, call));
+ addFilter(valueFactory.createURI(call.getURI()), GeoParseUtils.extractArguments(matchVar, call));
if (call.getParentNode() instanceof Filter || call.getParentNode() instanceof And || call.getParentNode() instanceof LeftJoin) {
call.replaceWith(new ValueConstant(valueFactory.createLiteral(true)));
} else {
@@ -241,26 +240,6 @@ public class GeoEnabledFilterFunctionOptimizer implements QueryOptimizer, Config
}
}
- private Value[] extractArguments(final String matchName, final FunctionCall call) {
- final Value args[] = new Value[call.getArgs().size() - 1];
- int argI = 0;
- for (int i = 0; i != call.getArgs().size(); ++i) {
- final ValueExpr arg = call.getArgs().get(i);
- if (argI == i && arg instanceof Var && matchName.equals(((Var)arg).getName())) {
- continue;
- }
- if (arg instanceof ValueConstant) {
- args[argI] = ((ValueConstant)arg).getValue();
- } else if (arg instanceof Var && ((Var)arg).hasValue()) {
- args[argI] = ((Var)arg).getValue();
- } else {
- throw new IllegalArgumentException("Query error: Found " + arg + ", expected a Literal, BNode or URI");
- }
- ++argI;
- }
- return args;
- }
-
@Override
public void meet(final Filter filter) {
//First visit children, then condition (reverse of default):
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/646d21b4/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/accumulo/geo/GeoParseUtils.java
----------------------------------------------------------------------
diff --git a/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/accumulo/geo/GeoParseUtils.java b/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/accumulo/geo/GeoParseUtils.java
index ffba225..e8fbc3d 100644
--- a/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/accumulo/geo/GeoParseUtils.java
+++ b/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/accumulo/geo/GeoParseUtils.java
@@ -14,9 +14,9 @@ import javax.xml.parsers.ParserConfigurationException;
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -27,21 +27,25 @@ import javax.xml.parsers.ParserConfigurationException;
import org.apache.log4j.Logger;
+import org.apache.rya.indexing.GeoConstants;
import org.geotools.gml3.GMLConfiguration;
import org.geotools.xml.Parser;
import org.openrdf.model.Literal;
import org.openrdf.model.Statement;
+import org.openrdf.model.Value;
+import org.openrdf.query.algebra.FunctionCall;
+import org.openrdf.query.algebra.ValueConstant;
+import org.openrdf.query.algebra.ValueExpr;
+import org.openrdf.query.algebra.Var;
import org.xml.sax.SAXException;
import com.vividsolutions.jts.geom.Geometry;
import com.vividsolutions.jts.io.ParseException;
import com.vividsolutions.jts.io.WKTReader;
-import org.apache.rya.indexing.GeoConstants;
-
public class GeoParseUtils {
static final Logger logger = Logger.getLogger(GeoParseUtils.class);
- /**
+ /**
* @deprecated Not needed since geo literals may be WKT or GML.
*
* This method warns on a condition that must already be tested. Replaced by
@@ -50,41 +54,41 @@ public class GeoParseUtils {
* and getLiteral(statement).getDatatype()
*/
@Deprecated
- public static String getWellKnownText(Statement statement) throws ParseException {
- Literal lit = getLiteral(statement);
+ public static String getWellKnownText(final Statement statement) throws ParseException {
+ final Literal lit = getLiteral(statement);
if (!GeoConstants.XMLSCHEMA_OGC_WKT.equals(lit.getDatatype())) {
logger.warn("Literal is not of type " + GeoConstants.XMLSCHEMA_OGC_WKT + ": " + statement.toString());
}
return lit.getLabel().toString();
}
- public static Literal getLiteral(Statement statement) throws ParseException {
- org.openrdf.model.Value v = statement.getObject();
+ public static Literal getLiteral(final Statement statement) throws ParseException {
+ final org.openrdf.model.Value v = statement.getObject();
if (!(v instanceof Literal)) {
throw new ParseException("Statement does not contain Literal: " + statement.toString());
}
- Literal lit = (Literal) v;
+ final Literal lit = (Literal) v;
return lit;
}
/**
* Parse GML/wkt literal to Geometry
- *
+ *
* @param statement
* @return
* @throws ParseException
- * @throws ParserConfigurationException
- * @throws SAXException
- * @throws IOException
+ * @throws ParserConfigurationException
+ * @throws SAXException
+ * @throws IOException
*/
- public static Geometry getGeometry(Statement statement) throws ParseException {
+ public static Geometry getGeometry(final Statement statement) throws ParseException {
// handle GML or WKT
- Literal lit = getLiteral(statement);
+ final Literal lit = getLiteral(statement);
if (GeoConstants.XMLSCHEMA_OGC_WKT.equals(lit.getDatatype())) {
final String wkt = lit.getLabel().toString();
return (new WKTReader()).read(wkt);
} else if (GeoConstants.XMLSCHEMA_OGC_GML.equals(lit.getDatatype())) {
- String gml = lit.getLabel().toString();
+ final String gml = lit.getLabel().toString();
try {
return getGeometryGml(gml);
} catch (IOException | SAXException | ParserConfigurationException e) {
@@ -102,18 +106,43 @@ public class GeoParseUtils {
* @throws SAXException
* @throws ParserConfigurationException
*/
- public static Geometry getGeometryGml(String gmlString) throws IOException, SAXException, ParserConfigurationException {
- Reader reader = new StringReader(gmlString);
- GMLConfiguration gmlConfiguration = new GMLConfiguration();
- Parser gmlParser = new Parser(gmlConfiguration);
+ public static Geometry getGeometryGml(final String gmlString) throws IOException, SAXException, ParserConfigurationException {
+ final Reader reader = new StringReader(gmlString);
+ final GMLConfiguration gmlConfiguration = new GMLConfiguration();
+ final Parser gmlParser = new Parser(gmlConfiguration);
// gmlParser.setStrict(false); // attempt at allowing deprecated elements, but no.
// gmlParser.setValidating(false);
final Geometry geometry = (Geometry) gmlParser.parse(reader);
// This sometimes gets populated with the SRS/CRS: geometry.getUserData()
- // Always returns 0 : geometry.getSRID()
+ // Always returns 0 : geometry.getSRID()
//TODO geometry.setUserData(some default CRS); OR geometry.setSRID(some default CRS)
-
+
return geometry;
}
+ /**
+ * Extracts the arguments used in a {@link FunctionCall}.
+ * @param matchName - The variable name to match to arguments used in the {@link FunctionCall}.
+ * @param call - The {@link FunctionCall} to match against.
+ * @return - The {@link Value}s matched.
+ */
+ public static Value[] extractArguments(final String matchName, final FunctionCall call) {
+ final Value args[] = new Value[call.getArgs().size() - 1];
+ int argI = 0;
+ for (int i = 0; i != call.getArgs().size(); ++i) {
+ final ValueExpr arg = call.getArgs().get(i);
+ if (argI == i && arg instanceof Var && matchName.equals(((Var)arg).getName())) {
+ continue;
+ }
+ if (arg instanceof ValueConstant) {
+ args[argI] = ((ValueConstant)arg).getValue();
+ } else if (arg instanceof Var && ((Var)arg).hasValue()) {
+ args[argI] = ((Var)arg).getValue();
+ } else {
+ throw new IllegalArgumentException("Query error: Found " + arg + ", expected a Literal, BNode or URI");
+ }
+ ++argI;
+ }
+ return args;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/646d21b4/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/GeoTemporalIndexSetProvider.java
----------------------------------------------------------------------
diff --git a/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/GeoTemporalIndexSetProvider.java b/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/GeoTemporalIndexSetProvider.java
index 38790c4..bf12f26 100644
--- a/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/GeoTemporalIndexSetProvider.java
+++ b/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/GeoTemporalIndexSetProvider.java
@@ -28,22 +28,22 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
+import org.apache.log4j.Logger;
import org.apache.rya.indexing.IndexingExpr;
import org.apache.rya.indexing.IndexingFunctionRegistry;
import org.apache.rya.indexing.IndexingFunctionRegistry.FUNCTION_TYPE;
+import org.apache.rya.indexing.accumulo.geo.GeoParseUtils;
import org.apache.rya.indexing.accumulo.geo.GeoTupleSet;
import org.apache.rya.indexing.external.matching.ExternalSetProvider;
import org.apache.rya.indexing.external.matching.QuerySegment;
import org.apache.rya.indexing.geotemporal.model.EventQueryNode;
+import org.apache.rya.indexing.geotemporal.model.EventQueryNode.EventQueryNodeBuilder;
import org.apache.rya.indexing.geotemporal.storage.EventStorage;
import org.openrdf.model.URI;
-import org.openrdf.model.Value;
import org.openrdf.model.impl.URIImpl;
import org.openrdf.query.algebra.FunctionCall;
import org.openrdf.query.algebra.QueryModelNode;
import org.openrdf.query.algebra.StatementPattern;
-import org.openrdf.query.algebra.ValueConstant;
-import org.openrdf.query.algebra.ValueExpr;
import org.openrdf.query.algebra.Var;
import org.openrdf.query.algebra.helpers.QueryModelVisitorBase;
@@ -54,6 +54,8 @@ import com.google.common.collect.Multimap;
* Provides {@link GeoTupleSet}s.
*/
public class GeoTemporalIndexSetProvider implements ExternalSetProvider<EventQueryNode> {
+ private static final Logger LOG = Logger.getLogger(GeoTemporalIndexSetProvider.class);
+
//organzied by object var. Each object is a filter, or set of filters
private Multimap<Var, IndexingExpr> filterMap;
@@ -138,13 +140,20 @@ public class GeoTemporalIndexSetProvider implements ExternalSetProvider<EventQue
}
if(geoFilters.isPresent() && temporalFilters.isPresent() && geoPattern.isPresent() && temporalPattern.isPresent()) {
- return new EventQueryNode(eventStorage, geoPattern.get(), temporalPattern.get(), geoFilters.get(), temporalFilters.get(), usedFilters);
+ return new EventQueryNodeBuilder()
+ .setStorage(eventStorage)
+ .setGeoPattern(geoPattern.get())
+ .setTemporalPattern(temporalPattern.get())
+ .setGeoFilters(geoFilters.get())
+ .setTemporalFilters(temporalFilters.get())
+ .setUsedFilters(usedFilters)
+ .build();
} else {
return null;
}
}
- private FUNCTION_TYPE ensureSameType(final Collection<IndexingExpr> filters) {
+ private static FUNCTION_TYPE ensureSameType(final Collection<IndexingExpr> filters) {
FUNCTION_TYPE type = null;
for(final IndexingExpr filter : filters) {
if(type == null) {
@@ -174,7 +183,7 @@ public class GeoTemporalIndexSetProvider implements ExternalSetProvider<EventQue
try {
filter.visit(new FilterVisitor());
} catch (final Exception e) {
- e.printStackTrace();
+ LOG.error("Failed to match the filter object.", e);
}
}
@@ -204,27 +213,7 @@ public class GeoTemporalIndexSetProvider implements ExternalSetProvider<EventQue
private void addFilter(final FunctionCall call) {
filterURI = new URIImpl(call.getURI());
final Var objVar = IndexingFunctionRegistry.getResultVarFromFunctionCall(filterURI, call.getArgs());
- filterMap.put(objVar, new IndexingExpr(filterURI, objectPatterns.get(objVar), extractArguments(objVar.getName(), call)));
- }
-
- private Value[] extractArguments(final String matchName, final FunctionCall call) {
- final Value args[] = new Value[call.getArgs().size() - 1];
- int argI = 0;
- for (int i = 0; i != call.getArgs().size(); ++i) {
- final ValueExpr arg = call.getArgs().get(i);
- if (argI == i && arg instanceof Var && matchName.equals(((Var)arg).getName())) {
- continue;
- }
- if (arg instanceof ValueConstant) {
- args[argI] = ((ValueConstant)arg).getValue();
- } else if (arg instanceof Var && ((Var)arg).hasValue()) {
- args[argI] = ((Var)arg).getValue();
- } else {
- throw new IllegalArgumentException("Query error: Found " + arg + ", expected a Literal, BNode or URI");
- }
- ++argI;
- }
- return args;
+ filterMap.put(objVar, new IndexingExpr(filterURI, objectPatterns.get(objVar), GeoParseUtils.extractArguments(objVar.getName(), call)));
}
/**
@@ -234,13 +223,12 @@ public class GeoTemporalIndexSetProvider implements ExternalSetProvider<EventQue
private class FilterVisitor extends QueryModelVisitorBase<Exception> {
@Override
public void meet(final FunctionCall call) throws Exception {
-
filterURI = new URIImpl(call.getURI());
final FUNCTION_TYPE type = IndexingFunctionRegistry.getFunctionType(filterURI);
if(type == FUNCTION_TYPE.GEO || type == FUNCTION_TYPE.TEMPORAL) {
final Var objVar = IndexingFunctionRegistry.getResultVarFromFunctionCall(filterURI, call.getArgs());
if(objectPatterns.containsKey(objVar)) {
- filterMap.put(objVar, new IndexingExpr(filterURI, objectPatterns.get(objVar), extractArguments(objVar.getName(), call)));
+ filterMap.put(objVar, new IndexingExpr(filterURI, objectPatterns.get(objVar), GeoParseUtils.extractArguments(objVar.getName(), call)));
matchedFilters.put(objVar, call);
} else {
unmatchedFilters.put(objVar, call);
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/646d21b4/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/GeoTemporalIndexer.java
----------------------------------------------------------------------
diff --git a/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/GeoTemporalIndexer.java b/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/GeoTemporalIndexer.java
index 01b254b..cbc978b 100644
--- a/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/GeoTemporalIndexer.java
+++ b/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/GeoTemporalIndexer.java
@@ -39,14 +39,50 @@ public interface GeoTemporalIndexer extends RyaSecondaryIndexer {
*/
public abstract EventStorage getEventStorage(final Configuration conf);
- public enum GeoPolicy {
+ /**
+ * Used to indicate which geo filter functions to use in a query.
+ */
+ public static enum GeoPolicy {
+ /**
+ * The provided geo object equals the geo object where the event took place.
+ */
EQUALS(GeoConstants.GEO_SF_EQUALS),
+
+ /**
+ * The provided geo object does not share any space with the event.
+ */
DISJOINT(GeoConstants.GEO_SF_DISJOINT),
+
+ /**
+ * The provided geo object shares some amount of space with the event.
+ */
INTERSECTS(GeoConstants.GEO_SF_INTERSECTS),
+
+ /**
+ * The provided geo object shares a point with the event, but only on the edge.
+ */
TOUCHES(GeoConstants.GEO_SF_TOUCHES),
+
+ /**
+ * The provided geo object shares some, but not all space with the event.
+ */
CROSSES(GeoConstants.GEO_SF_CROSSES),
+
+ /**
+ * The provided geo object exists completely within the event.
+ */
WITHIN(GeoConstants.GEO_SF_WITHIN),
+
+ /**
+ * The event took place completely within the provided geo object.
+ */
CONTAINS(GeoConstants.GEO_SF_CONTAINS),
+
+ /**
+ * The provided geo object has some but not all points in common with the event,
+ * are of the same dimension, and the intersection of the interiors has the
+ * same dimension as the geometries themselves.
+ */
OVERLAPS(GeoConstants.GEO_SF_OVERLAPS);
private final URI uri;
@@ -69,10 +105,9 @@ public interface GeoTemporalIndexer extends RyaSecondaryIndexer {
}
}
- String TEMPORAL_NS = "tag:rya-rdf.org,2015:temporal#";
+ static final String TEMPORAL_NS = "tag:rya-rdf.org,2015:temporal#";
/**
- * All of the filter functions that can be used in a temporal based query.
- * <p>
+ * Used to indicate which temporal filter functions to use in a query.
*/
public enum TemporalPolicy {
/**
@@ -106,12 +141,28 @@ public interface GeoTemporalIndexer extends RyaSecondaryIndexer {
INSTANT_AFTER_INTERVAL(false, new URIImpl(TEMPORAL_NS+"afterInterval")),
/**
- * The provided instant in time equals the instant the event took place.
+ * The provided instant in time equals the start of the interval in which the event took place.
*/
INSTANT_START_INTERVAL(false, new URIImpl(TEMPORAL_NS+"hasBeginningInterval")),
+
+ /**
+ * The provided instant in time equals the end of the interval in which the event took place.
+ */
INSTANT_END_INTERVAL(false, new URIImpl(TEMPORAL_NS+"hasEndInterval")),
+
+ /**
+ * The provided interval equals the interval in which the event took place.
+ */
INTERVAL_EQUALS(false, new URIImpl(TEMPORAL_NS+"intervalEquals")),
+
+ /**
+ * The provided interval is before the interval in which the event took place.
+ */
INTERVAL_BEFORE(false, new URIImpl(TEMPORAL_NS+"intervalBefore")),
+
+ /**
+ * The provided interval is after the interval in which the event took place.
+ */
INTERVAL_AFTER(false, new URIImpl(TEMPORAL_NS+"intervalAfter"));
private final boolean isInstant;
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/646d21b4/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/model/EventQueryNode.java
----------------------------------------------------------------------
diff --git a/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/model/EventQueryNode.java b/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/model/EventQueryNode.java
index 6953714..104fca8 100644
--- a/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/model/EventQueryNode.java
+++ b/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/model/EventQueryNode.java
@@ -29,12 +29,17 @@ import java.util.Objects;
import java.util.Optional;
import java.util.Set;
+import org.apache.commons.lang.builder.EqualsBuilder;
import org.apache.rya.api.domain.RyaURI;
import org.apache.rya.indexing.IndexingExpr;
+import org.apache.rya.indexing.TemporalInstant;
import org.apache.rya.indexing.TemporalInstantRfc3339;
+import org.apache.rya.indexing.entity.query.EntityQueryNode;
import org.apache.rya.indexing.geotemporal.storage.EventStorage;
import org.apache.rya.indexing.mongodb.update.RyaObjectStorage.ObjectStorageException;
import org.apache.rya.rdftriplestore.evaluation.ExternalBatchingIterator;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
import org.openrdf.model.Value;
import org.openrdf.model.impl.ValueFactoryImpl;
import org.openrdf.query.BindingSet;
@@ -60,8 +65,10 @@ public class EventQueryNode extends ExternalSet implements ExternalBatchingItera
//Information about the subject of the patterns.
private final boolean subjectIsConstant;
- private final Optional<String> subjectConstant;
private final Optional<String> subjectVar;
+ //not final because if the subject is a variable and the evaluate() is
+ // provided a binding set that contains the subject, this optional is used.
+ private Optional<String> subjectConstant;
//since and EventQueryNode exists in a single segment, all binding names are garunteed to be assured.
private final Set<String> bindingNames;
@@ -80,7 +87,7 @@ public class EventQueryNode extends ExternalSet implements ExternalBatchingItera
* @param entities - The {@link EventStorage} that will be searched to match
* {@link BindingSet}s when evaluating a query. (not null)
*/
- public EventQueryNode(final EventStorage eventStore, final StatementPattern geoPattern, final StatementPattern temporalPattern, final Collection<IndexingExpr> geoFilters, final Collection<IndexingExpr> temporalFilters, final Collection<FunctionCall> usedFilters) throws IllegalStateException {
+ private EventQueryNode(final EventStorage eventStore, final StatementPattern geoPattern, final StatementPattern temporalPattern, final Collection<IndexingExpr> geoFilters, final Collection<IndexingExpr> temporalFilters, final Collection<FunctionCall> usedFilters) throws IllegalStateException {
this.geoPattern = requireNonNull(geoPattern);
this.temporalPattern = requireNonNull(temporalPattern);
this.geoFilters = requireNonNull(geoFilters);
@@ -159,8 +166,15 @@ public class EventQueryNode extends ExternalSet implements ExternalBatchingItera
try {
final Collection<Event> searchEvents;
final String subj;
+ //if the provided binding set has the subject already, set it to the constant subject.
+ if(!subjectConstant.isPresent() && bindings.hasBinding(subjectVar.get())) {
+ subjectConstant = Optional.of(bindings.getValue(subjectVar.get()).stringValue());
+ } else if(bindings.size() != 0) {
+ list.add(bindings);
+ }
+
// If the subject needs to be filled in, check if the subject variable is in the binding set.
- if(subjectIsConstant) {
+ if(subjectConstant.isPresent()) {
// if it is, fetch that value and then fetch the entity for the subject.
subj = subjectConstant.get();
searchEvents = eventStore.search(Optional.of(new RyaURI(subj)), Optional.of(geoFilters), Optional.of(temporalFilters));
@@ -179,7 +193,11 @@ public class EventQueryNode extends ExternalSet implements ExternalBatchingItera
final Value temporalValue;
if(event.isInstant() && event.getInstant().isPresent()) {
- temporalValue = ValueFactoryImpl.getInstance().createLiteral(event.getInstant().get().getAsDateTime().toString(TemporalInstantRfc3339.FORMATTER));
+ final Optional<TemporalInstant> opt = event.getInstant();
+ DateTime dt = opt.get().getAsDateTime();
+ dt = dt.toDateTime(DateTimeZone.UTC);
+ final String str = dt.toString(TemporalInstantRfc3339.FORMATTER);
+ temporalValue = ValueFactoryImpl.getInstance().createLiteral(str);
} else if(event.getInterval().isPresent()) {
temporalValue = ValueFactoryImpl.getInstance().createLiteral(event.getInterval().get().getAsPair());
} else {
@@ -195,9 +213,6 @@ public class EventQueryNode extends ExternalSet implements ExternalBatchingItera
} catch (final ObjectStorageException e) {
throw new QueryEvaluationException("Failed to evaluate the binding set", e);
}
- if(bindings.size() != 0) {
- list.add(bindings);
- }
return new CollectionIteration<>(list);
}
@@ -238,15 +253,16 @@ public class EventQueryNode extends ExternalSet implements ExternalBatchingItera
public boolean equals(final Object other) {
if(other instanceof EventQueryNode) {
final EventQueryNode otherNode = (EventQueryNode)other;
-
- return Objects.equals(subjectIsConstant, otherNode.subjectIsConstant) &&
- Objects.equals(subjectVar, otherNode.subjectVar) &&
- Objects.equals(geoFilters, otherNode.geoFilters) &&
- Objects.equals(geoPattern, otherNode.geoPattern) &&
- Objects.equals(temporalFilters, otherNode.temporalFilters) &&
- Objects.equals(temporalPattern, otherNode.temporalPattern) &&
- Objects.equals(bindingNames, otherNode.bindingNames) &&
- Objects.equals(subjectConstant, otherNode.subjectConstant);
+ return new EqualsBuilder()
+ .append(subjectIsConstant, otherNode.subjectIsConstant)
+ .append(subjectVar, otherNode.subjectVar)
+ .append(geoFilters, otherNode.geoFilters)
+ .append(geoPattern, otherNode.geoPattern)
+ .append(temporalFilters, otherNode.temporalFilters)
+ .append(temporalPattern, otherNode.temporalPattern)
+ .append(bindingNames, otherNode.bindingNames)
+ .append(subjectConstant, otherNode.subjectConstant)
+ .isEquals();
}
return false;
}
@@ -280,4 +296,77 @@ public class EventQueryNode extends ExternalSet implements ExternalBatchingItera
throws QueryEvaluationException {
return null;
}
+
+ /**
+ * Builder for {@link EventQueryNode}s.
+ */
+ public static class EventQueryNodeBuilder {
+ private EventStorage store;
+ private StatementPattern geoPattern;
+ private StatementPattern temporalPattern;
+ private Collection<IndexingExpr> geoFilters;
+ private Collection<IndexingExpr> temporalFilters;
+ private Collection<FunctionCall> usedFilters;
+
+ /**
+ * @param store - The {@link EventStorage} to use in the {@link EntityQueryNode}
+ * @return - The Builder.
+ */
+ public EventQueryNodeBuilder setStorage(final EventStorage store) {
+ this.store = store;
+ return this;
+ }
+
+ /**
+ * @param geoPattern - The geo {@link StatementPattern} to use in the {@link EntityQueryNode}
+ * @return - The Builder.
+ */
+ public EventQueryNodeBuilder setGeoPattern(final StatementPattern geoPattern) {
+ this.geoPattern = geoPattern;
+ return this;
+ }
+
+ /**
+ * @param temporalPattern - The temporal {@link StatementPattern} to use in the {@link EntityQueryNode}
+ * @return - The Builder.
+ */
+ public EventQueryNodeBuilder setTemporalPattern(final StatementPattern temporalPattern) {
+ this.temporalPattern = temporalPattern;
+ return this;
+ }
+
+ /**
+ * @param geoFilters - The geo filter(s) {@link IndexingExpr} to use in the {@link EntityQueryNode}
+ * @return - The Builder.
+ */
+ public EventQueryNodeBuilder setGeoFilters(final Collection<IndexingExpr> geoFilters) {
+ this.geoFilters = geoFilters;
+ return this;
+ }
+
+ /**
+ * @param temporalFilters - The temporal filter(s) {@link IndexingExpr} to use in the {@link EntityQueryNode}
+ * @return - The Builder.
+ */
+ public EventQueryNodeBuilder setTemporalFilters(final Collection<IndexingExpr> temporalFilters) {
+ this.temporalFilters = temporalFilters;
+ return this;
+ }
+
+ /**
+ * @param usedFilters - The filter(s) used by the {@link EntityQueryNode}
+ * @return - The Builder.
+ */
+ public EventQueryNodeBuilder setUsedFilters(final Collection<FunctionCall> usedFilters) {
+ this.usedFilters = usedFilters;
+ return this;
+ }
+
+ /**
+ * @return The {@link EntityQueryNode} built by the builder.
+ */
+ public EventQueryNode build() {
+ return new EventQueryNode(store, geoPattern, temporalPattern, geoFilters, temporalFilters, usedFilters);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/646d21b4/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/mongo/EventUpdater.java
----------------------------------------------------------------------
diff --git a/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/mongo/EventUpdater.java b/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/mongo/EventUpdater.java
index c9f4658..1c62407 100644
--- a/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/mongo/EventUpdater.java
+++ b/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/mongo/EventUpdater.java
@@ -26,7 +26,7 @@ import org.apache.rya.api.domain.RyaURI;
import org.apache.rya.indexing.geotemporal.model.Event;
import org.apache.rya.indexing.geotemporal.storage.EventStorage;
import org.apache.rya.indexing.geotemporal.storage.EventStorage.EventStorageException;
-import org.apache.rya.indexing.mongodb.update.DocumentUpdater;
+import org.apache.rya.indexing.mongodb.update.MongoDocumentUpdater;
import org.apache.rya.indexing.mongodb.update.RyaObjectStorage.ObjectStorageException;
import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
@@ -36,7 +36,7 @@ import edu.umd.cs.findbugs.annotations.NonNull;
* Performs update operations over an {@link EventStorage}.
*/
@DefaultAnnotation(NonNull.class)
-public class EventUpdater implements DocumentUpdater<RyaURI, Event>{
+public class EventUpdater implements MongoDocumentUpdater<RyaURI, Event>{
private final EventStorage events;
/**
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/646d21b4/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/mongo/GeoTemporalMongoDBStorageStrategy.java
----------------------------------------------------------------------
diff --git a/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/mongo/GeoTemporalMongoDBStorageStrategy.java b/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/mongo/GeoTemporalMongoDBStorageStrategy.java
index 352dcb6..ab44ffe 100644
--- a/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/mongo/GeoTemporalMongoDBStorageStrategy.java
+++ b/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/mongo/GeoTemporalMongoDBStorageStrategy.java
@@ -63,7 +63,8 @@ import com.vividsolutions.jts.io.WKTReader;
import jline.internal.Log;
/**
- * TODO: doc
+ * Storage adapter for serializing Geo Temporal statements into mongo objects.
+ * This includes adapting the {@link IndexingExpr}s for the GeoTemporal indexer.
*/
public class GeoTemporalMongoDBStorageStrategy extends IndexingMongoDBStorageStrategy {
private static final Logger LOG = Logger.getLogger(GeoTemporalMongoDBStorageStrategy.class);
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/646d21b4/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/mongo/MongoEventStorage.java
----------------------------------------------------------------------
diff --git a/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/mongo/MongoEventStorage.java b/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/mongo/MongoEventStorage.java
index 8ddf075..9c13c8b 100644
--- a/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/mongo/MongoEventStorage.java
+++ b/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/mongo/MongoEventStorage.java
@@ -31,10 +31,10 @@ import org.apache.rya.api.domain.RyaURI;
import org.apache.rya.indexing.IndexingExpr;
import org.apache.rya.indexing.entity.model.TypedEntity;
import org.apache.rya.indexing.entity.storage.mongo.DocumentConverter.DocumentConverterException;
+import org.apache.rya.indexing.entity.storage.mongo.MongoEntityStorage;
import org.apache.rya.indexing.geotemporal.GeoTemporalIndexException;
import org.apache.rya.indexing.geotemporal.model.Event;
import org.apache.rya.indexing.geotemporal.storage.EventStorage;
-import org.apache.rya.indexing.entity.storage.mongo.MongoEntityStorage;
import org.bson.BsonDocument;
import org.bson.BsonString;
import org.bson.Document;
@@ -138,9 +138,8 @@ public class MongoEventStorage implements EventStorage {
.iterator();
final List<Event> events = new ArrayList<>();
- final EventDocumentConverter adapter = new EventDocumentConverter();
while(results.hasNext()) {
- events.add(adapter.fromDocument(results.next()));
+ events.add(EVENT_CONVERTER.fromDocument(results.next()));
}
return events;
} catch(final MongoException | DocumentConverterException | GeoTemporalIndexException e) {
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/646d21b4/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/mongo/MongoGeoTemporalIndexer.java
----------------------------------------------------------------------
diff --git a/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/mongo/MongoGeoTemporalIndexer.java b/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/mongo/MongoGeoTemporalIndexer.java
index 1baab18..34df399 100644
--- a/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/mongo/MongoGeoTemporalIndexer.java
+++ b/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/mongo/MongoGeoTemporalIndexer.java
@@ -206,12 +206,16 @@ public class MongoGeoTemporalIndexer extends AbstractMongoIndexer<GeoTemporalMon
@Override
public EventStorage getEventStorage(final Configuration conf) {
+ requireNonNull(conf);
+
if(events.get() != null) {
return events.get();
}
- final MongoDBRdfConfiguration mongoConf = (MongoDBRdfConfiguration) conf;
+
+ final MongoDBRdfConfiguration mongoConf = new MongoDBRdfConfiguration(conf);
mongoClient = mongoConf.getMongoClient();
+ configuration.set(mongoConf);
if (mongoClient == null) {
mongoClient = MongoConnectorFactory.getMongoClient(conf);
}