You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rya.apache.org by ca...@apache.org on 2018/01/12 21:25:35 UTC
[7/8] incubator-rya git commit: RYA-303 Mongo PCJ Support. Closes
#172.
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c826ffea/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/ConfigUtils.java.orig
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/ConfigUtils.java.orig b/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/ConfigUtils.java.orig
deleted file mode 100644
index 9311200..0000000
--- a/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/ConfigUtils.java.orig
+++ /dev/null
@@ -1,529 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.rya.indexing.accumulo;
-
-import static java.util.Objects.requireNonNull;
-
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
-import org.apache.accumulo.core.client.AccumuloException;
-import org.apache.accumulo.core.client.AccumuloSecurityException;
-import org.apache.accumulo.core.client.BatchScanner;
-import org.apache.accumulo.core.client.BatchWriter;
-import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.Instance;
-import org.apache.accumulo.core.client.MultiTableBatchWriter;
-import org.apache.accumulo.core.client.Scanner;
-import org.apache.accumulo.core.client.TableExistsException;
-import org.apache.accumulo.core.client.TableNotFoundException;
-import org.apache.accumulo.core.client.ZooKeeperInstance;
-import org.apache.accumulo.core.client.admin.TableOperations;
-import org.apache.accumulo.core.client.mock.MockInstance;
-import org.apache.accumulo.core.security.Authorizations;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.log4j.Logger;
-import org.apache.rya.accumulo.AccumuloRdfConfiguration;
-import org.apache.rya.accumulo.utils.ConnectorFactory;
-import org.apache.rya.api.RdfCloudTripleStoreConfiguration;
-import org.apache.rya.api.instance.RyaDetails;
-import org.apache.rya.indexing.FilterFunctionOptimizer;
-import org.apache.rya.indexing.accumulo.entity.EntityCentricIndex;
-import org.apache.rya.indexing.accumulo.entity.EntityOptimizer;
-import org.apache.rya.indexing.accumulo.freetext.AccumuloFreeTextIndexer;
-import org.apache.rya.indexing.accumulo.freetext.LuceneTokenizer;
-import org.apache.rya.indexing.accumulo.freetext.Tokenizer;
-import org.apache.rya.indexing.accumulo.temporal.AccumuloTemporalIndexer;
-import org.apache.rya.indexing.entity.EntityIndexOptimizer;
-import org.apache.rya.indexing.entity.update.mongo.MongoEntityIndexer;
-import org.apache.rya.indexing.external.PrecomputedJoinIndexer;
-import org.apache.rya.indexing.mongodb.freetext.MongoFreeTextIndexer;
-import org.apache.rya.indexing.mongodb.temporal.MongoTemporalIndexer;
-import org.apache.rya.indexing.pcj.matching.PCJOptimizer;
-import org.apache.rya.indexing.statement.metadata.matching.StatementMetadataOptimizer;
-import org.openrdf.model.URI;
-import org.openrdf.model.impl.URIImpl;
-
-import com.google.common.base.Optional;
-import com.google.common.collect.Lists;
-
-/**
- * A set of configuration utils to read a Hadoop {@link Configuration} object and create Cloudbase/Accumulo objects.
- * Soon will deprecate this class. Use installer for the set methods, use {@link RyaDetails} for the get methods.
- * New code must separate parameters that are set at Rya install time from that which is specific to the client.
- * Also Accumulo index tables are pushed down to the implementation and not configured in conf.
- */
-public class ConfigUtils {
- private static final Logger logger = Logger.getLogger(ConfigUtils.class);
-
- /**
- * @Deprecated use {@link RdfCloudTripleStoreConfiguration#CONF_TBL_PREFIX} instead.
- */
- @Deprecated
- public static final String CLOUDBASE_TBL_PREFIX = RdfCloudTripleStoreConfiguration.CONF_TBL_PREFIX;
-
- /**
- * @Deprecated use {@link AccumuloRdfConfiguration#CLOUDBASE_INSTANCE} instead.
- */
- @Deprecated
- public static final String CLOUDBASE_INSTANCE = AccumuloRdfConfiguration.CLOUDBASE_INSTANCE;
-
- /**
- * @Deprecated use {@link AccumuloRdfConfiguration#CLOUDBASE_ZOOKEEPERS} instead.
- */
- @Deprecated
- public static final String CLOUDBASE_ZOOKEEPERS = AccumuloRdfConfiguration.CLOUDBASE_ZOOKEEPERS;
-
- /**
- * @Deprecated use {@link AccumuloRdfConfiguration#CLOUDBASE_USER} instead.
- */
- @Deprecated
- public static final String CLOUDBASE_USER = AccumuloRdfConfiguration.CLOUDBASE_USER;
-
- /**
- * @Deprecated use {@link AccumuloRdfConfiguration#CLOUDBASE_PASSWORD} instead.
- */
- @Deprecated
- public static final String CLOUDBASE_PASSWORD = AccumuloRdfConfiguration.CLOUDBASE_PASSWORD;
- /**
- * @Deprecated use {@link RdfCloudTripleStoreConfiguration#CONF_QUERY_AUTH} instead.
- */
- @Deprecated
- public static final String CLOUDBASE_AUTHS = RdfCloudTripleStoreConfiguration.CONF_QUERY_AUTH;
-
- public static final String CLOUDBASE_WRITER_MAX_WRITE_THREADS = "sc.cloudbase.writer.maxwritethreads";
- public static final String CLOUDBASE_WRITER_MAX_LATENCY = "sc.cloudbase.writer.maxlatency";
- public static final String CLOUDBASE_WRITER_MAX_MEMORY = "sc.cloudbase.writer.maxmemory";
-
- public static final String FREE_TEXT_QUERY_TERM_LIMIT = "sc.freetext.querytermlimit";
-
- public static final String USE_FREETEXT = "sc.use_freetext";
- public static final String USE_TEMPORAL = "sc.use_temporal";
- public static final String USE_ENTITY = "sc.use_entity";
- public static final String USE_PCJ = "sc.use_pcj";
- public static final String USE_OPTIMAL_PCJ = "sc.use.optimal.pcj";
- public static final String USE_PCJ_UPDATER_INDEX = "sc.use.updater";
-
- public static final String FLUO_APP_NAME = "rya.indexing.pcj.fluo.fluoAppName";
- public static final String USE_PCJ_FLUO_UPDATER = "rya.indexing.pcj.updater.fluo";
- public static final String PCJ_STORAGE_TYPE = "rya.indexing.pcj.storageType";
- public static final String PCJ_UPDATER_TYPE = "rya.indexing.pcj.updaterType";
-
- public static final String USE_MOCK_INSTANCE = AccumuloRdfConfiguration.USE_MOCK_INSTANCE;
-
- public static final String NUM_PARTITIONS = "sc.cloudbase.numPartitions";
-
- private static final int WRITER_MAX_WRITE_THREADS = 1;
- private static final long WRITER_MAX_LATNECY = Long.MAX_VALUE;
- private static final long WRITER_MAX_MEMORY = 10000L;
-
- public static final String DISPLAY_QUERY_PLAN = "query.printqueryplan";
-
- public static final String FREETEXT_PREDICATES_LIST = "sc.freetext.predicates";
- public static final String FREETEXT_DOC_NUM_PARTITIONS = "sc.freetext.numPartitions.text";
- public static final String FREETEXT_TERM_NUM_PARTITIONS = "sc.freetext.numPartitions.term";
-
- public static final String TOKENIZER_CLASS = "sc.freetext.tokenizer.class";
-
- public static final String GEO_PREDICATES_LIST = "sc.geo.predicates";
-
- public static final String TEMPORAL_PREDICATES_LIST = "sc.temporal.predicates";
-
- public static final String USE_MONGO = "sc.useMongo";
-
- public static boolean isDisplayQueryPlan(final Configuration conf) {
- return conf.getBoolean(DISPLAY_QUERY_PLAN, false);
- }
-
- /**
- * get a value from the configuration file and throw an exception if the
- * value does not exist.
- *
- * @param conf
- * @param key
- * @return
- */
- private static String getStringCheckSet(final Configuration conf, final String key) {
- final String value = conf.get(key);
- requireNonNull(value, key + " not set");
- return value;
- }
-
- /**
- * @param conf
- * @param tablename
- * @return if the table was created
- * @throws AccumuloException
- * @throws AccumuloSecurityException
- * @throws TableExistsException
- */
- public static boolean createTableIfNotExists(final Configuration conf, final String tablename)
- throws AccumuloException, AccumuloSecurityException, TableExistsException {
- final TableOperations tops = getConnector(conf).tableOperations();
- if (!tops.exists(tablename)) {
- logger.info("Creating table: " + tablename);
- tops.create(tablename);
- return true;
- }
- return false;
- }
-
- /**
- * Lookup the table name prefix in the conf and throw an error if it is
- * null. Future, get table prefix from RyaDetails -- the Rya instance name
- * -- also getting info from the RyaDetails should happen within
- * RyaSailFactory and not ConfigUtils.
- *
- * @param conf
- * Rya configuration map where it extracts the prefix (instance
- * name)
- * @return index table prefix corresponding to this Rya instance
- */
- public static String getTablePrefix(final Configuration conf) {
- final String tablePrefix;
- tablePrefix = conf.get(RdfCloudTripleStoreConfiguration.CONF_TBL_PREFIX);
- requireNonNull(tablePrefix,
- "Configuration key: " + RdfCloudTripleStoreConfiguration.CONF_TBL_PREFIX + " not set. Cannot generate table name.");
- return tablePrefix;
- }
-
- public static int getFreeTextTermLimit(final Configuration conf) {
- return conf.getInt(FREE_TEXT_QUERY_TERM_LIMIT, 100);
- }
-
- public static Set<URI> getFreeTextPredicates(final Configuration conf) {
- return getPredicates(conf, FREETEXT_PREDICATES_LIST);
- }
-
- public static Set<URI> getGeoPredicates(final Configuration conf) {
- return getPredicates(conf, GEO_PREDICATES_LIST);
- }
-
- /**
- * Used for indexing statements about date & time instances and intervals.
- *
- * @param conf
- * @return Set of predicate URI's whose objects should be date time
- * literals.
- */
- public static Set<URI> getTemporalPredicates(final Configuration conf) {
- return getPredicates(conf, TEMPORAL_PREDICATES_LIST);
- }
-
- protected static Set<URI> getPredicates(final Configuration conf, final String confName) {
- final String[] validPredicateStrings = conf.getStrings(confName, new String[] {});
- final Set<URI> predicates = new HashSet<>();
- for (final String prediateString : validPredicateStrings) {
- predicates.add(new URIImpl(prediateString));
- }
- return predicates;
- }
-
- public static Tokenizer getFreeTextTokenizer(final Configuration conf) {
- final Class<? extends Tokenizer> c = conf.getClass(TOKENIZER_CLASS, LuceneTokenizer.class, Tokenizer.class);
- return ReflectionUtils.newInstance(c, conf);
- }
-
- public static BatchWriter createDefaultBatchWriter(final String tablename, final Configuration conf)
- throws TableNotFoundException, AccumuloException, AccumuloSecurityException {
- final Long DEFAULT_MAX_MEMORY = getWriterMaxMemory(conf);
- final Long DEFAULT_MAX_LATENCY = getWriterMaxLatency(conf);
- final Integer DEFAULT_MAX_WRITE_THREADS = getWriterMaxWriteThreads(conf);
- final Connector connector = ConfigUtils.getConnector(conf);
- return connector.createBatchWriter(tablename, DEFAULT_MAX_MEMORY, DEFAULT_MAX_LATENCY, DEFAULT_MAX_WRITE_THREADS);
- }
-
- public static MultiTableBatchWriter createMultitableBatchWriter(final Configuration conf)
- throws AccumuloException, AccumuloSecurityException {
- final Long DEFAULT_MAX_MEMORY = getWriterMaxMemory(conf);
- final Long DEFAULT_MAX_LATENCY = getWriterMaxLatency(conf);
- final Integer DEFAULT_MAX_WRITE_THREADS = getWriterMaxWriteThreads(conf);
- final Connector connector = ConfigUtils.getConnector(conf);
- return connector.createMultiTableBatchWriter(DEFAULT_MAX_MEMORY, DEFAULT_MAX_LATENCY, DEFAULT_MAX_WRITE_THREADS);
- }
-
- public static Scanner createScanner(final String tablename, final Configuration conf)
- throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
- final Connector connector = ConfigUtils.getConnector(conf);
- final Authorizations auths = ConfigUtils.getAuthorizations(conf);
- return connector.createScanner(tablename, auths);
-
- }
-
- public static BatchScanner createBatchScanner(final String tablename, final Configuration conf)
- throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
- final Connector connector = ConfigUtils.getConnector(conf);
- final Authorizations auths = ConfigUtils.getAuthorizations(conf);
- Integer numThreads = null;
- if (conf instanceof RdfCloudTripleStoreConfiguration) {
- numThreads = ((RdfCloudTripleStoreConfiguration) conf).getNumThreads();
- } else {
- numThreads = conf.getInt(RdfCloudTripleStoreConfiguration.CONF_NUM_THREADS, 2);
- }
- return connector.createBatchScanner(tablename, auths, numThreads);
- }
-
- public static int getWriterMaxWriteThreads(final Configuration conf) {
- return conf.getInt(CLOUDBASE_WRITER_MAX_WRITE_THREADS, WRITER_MAX_WRITE_THREADS);
- }
-
- public static long getWriterMaxLatency(final Configuration conf) {
- return conf.getLong(CLOUDBASE_WRITER_MAX_LATENCY, WRITER_MAX_LATNECY);
- }
-
- public static long getWriterMaxMemory(final Configuration conf) {
- return conf.getLong(CLOUDBASE_WRITER_MAX_MEMORY, WRITER_MAX_MEMORY);
- }
-
- public static String getUsername(final JobContext job) {
- return getUsername(job.getConfiguration());
- }
-
- /**
- * Get the Accumulo username from the configuration object that is meant to
- * be used when connecting a {@link Connector} to Accumulo.
- *
- * @param conf - The configuration object that will be interrogated. (not null)
- * @return The username if one could be found; otherwise {@code null}.
- */
- public static String getUsername(final Configuration conf) {
- return new AccumuloRdfConfiguration(conf).getUsername();
- }
-
- public static Authorizations getAuthorizations(final JobContext job) {
- return getAuthorizations(job.getConfiguration());
- }
-
- public static Authorizations getAuthorizations(final Configuration conf) {
- final String authString = conf.get(RdfCloudTripleStoreConfiguration.CONF_QUERY_AUTH, "");
- if (authString.isEmpty()) {
- return new Authorizations();
- }
- return new Authorizations(authString.split(","));
- }
-
- public static Instance getInstance(final JobContext job) {
- return getInstance(job.getConfiguration());
- }
-
- /**
- * Create an {@link Instance} that may be used to create {@link Connector}s
- * to Accumulo. If the configuration has the {@link #USE_MOCK_INSTANCE} flag
- * set, then the instance will be be a {@link MockInstance} instead of a
- * Zookeeper backed instance.
- *
- * @param conf - The configuration object that will be interrogated. (not null)
- * @return The {@link Instance} that may be used to connect to Accumulo.
- */
- public static Instance getInstance(final Configuration conf) {
- // Pull out the Accumulo specific configuration values.
- final AccumuloRdfConfiguration accConf = new AccumuloRdfConfiguration(conf);
- String instanceName = accConf.getInstanceName();
- String zoookeepers = accConf.getZookeepers();
-
- // Create an Instance a mock if the mock flag is set.
- if (useMockInstance(conf)) {
- return new MockInstance(instanceName);
- }
-
- // Otherwise create an Instance to a Zookeeper managed instance of Accumulo.
- return new ZooKeeperInstance(instanceName, zoookeepers);
- }
-
- public static String getPassword(final JobContext job) {
- return getPassword(job.getConfiguration());
- }
-
- /**
- * Get the Accumulo password from the configuration object that is meant to
- * be used when connecting a {@link Connector} to Accumulo.
- *
- * @param conf - The configuration object that will be interrogated. (not null)
- * @return The password if one could be found; otherwise an empty string.
- */
- public static String getPassword(final Configuration conf) {
- return new AccumuloRdfConfiguration(conf).getPassword();
- }
-
- public static Connector getConnector(final JobContext job) throws AccumuloException, AccumuloSecurityException {
- return getConnector(job.getConfiguration());
- }
-
- /**
- * Create an Accumulo {@link Connector} using the configured connection information.
- * If the connection information points to a mock instance of Accumulo, then the
- * {@link #USE_MOCK_INSTANCE} flag must be set.
- *
- * @param conf - Configures how the connector will be built. (not null)
- * @return A {@link Connector} that may be used to interact with the configured Accumulo instance.
- * @throws AccumuloException The connector couldn't be created because of an Accumulo problem.
- * @throws AccumuloSecurityException The connector couldn't be created because of an Accumulo security violation.
- */
- public static Connector getConnector(final Configuration conf) throws AccumuloException, AccumuloSecurityException {
- return ConnectorFactory.connect( new AccumuloRdfConfiguration(conf) );
- }
-
- /**
- * Indicates that a Mock instance of Accumulo is being used to back the Rya instance.
- *
- * @param conf - The configuration object that will be interrogated. (not null)
- * @return {@code true} if the Rya instance is backed by a mock Accumulo; otherwise {@code false}.
- */
- public static boolean useMockInstance(final Configuration conf) {
- return new AccumuloRdfConfiguration(conf).useMockInstance();
- }
-
- protected static int getNumPartitions(final Configuration conf) {
- return conf.getInt(NUM_PARTITIONS, 25);
- }
-
- public static int getFreeTextDocNumPartitions(final Configuration conf) {
- return conf.getInt(FREETEXT_DOC_NUM_PARTITIONS, getNumPartitions(conf));
- }
-
- public static int getFreeTextTermNumPartitions(final Configuration conf) {
- return conf.getInt(FREETEXT_TERM_NUM_PARTITIONS, getNumPartitions(conf));
- }
-
- public static boolean getUseFreeText(final Configuration conf) {
- return conf.getBoolean(USE_FREETEXT, false);
- }
-
- public static boolean getUseTemporal(final Configuration conf) {
- return conf.getBoolean(USE_TEMPORAL, false);
- }
-
- public static boolean getUseEntity(final Configuration conf) {
- return conf.getBoolean(USE_ENTITY, false);
- }
-
- public static boolean getUsePCJ(final Configuration conf) {
- return conf.getBoolean(USE_PCJ, false);
- }
-
- public static boolean getUseOptimalPCJ(final Configuration conf) {
- return conf.getBoolean(USE_OPTIMAL_PCJ, false);
- }
-
- public static boolean getUsePcjUpdaterIndex(final Configuration conf) {
- return conf.getBoolean(USE_PCJ_UPDATER_INDEX, false);
- }
-
-
- /**
- * @return The name of the Fluo Application this instance of RYA is using to
- * incrementally update PCJs.
- */
- // TODO delete this eventually and use Details table
- public Optional<String> getFluoAppName(final Configuration conf) {
- return Optional.fromNullable(conf.get(FLUO_APP_NAME));
- }
-
-
- public static boolean getUseMongo(final Configuration conf) {
- return conf.getBoolean(USE_MONGO, false);
- }
-
-
- public static void setIndexers(final RdfCloudTripleStoreConfiguration conf) {
-
- final List<String> indexList = Lists.newArrayList();
- final List<String> optimizers = Lists.newArrayList();
-
- boolean useFilterIndex = false;
-
- if (ConfigUtils.getUseMongo(conf)) {
- if (getUseFreeText(conf)) {
- indexList.add(MongoFreeTextIndexer.class.getName());
- useFilterIndex = true;
- }
-
- if (getUseEntity(conf)) {
- indexList.add(MongoEntityIndexer.class.getName());
- optimizers.add(EntityIndexOptimizer.class.getName());
- }
-
- if (getUseTemporal(conf)) {
- indexList.add(MongoTemporalIndexer.class.getName());
- useFilterIndex = true;
- }
- } else {
- if (getUsePCJ(conf) || getUseOptimalPCJ(conf)) {
- conf.setPcjOptimizer(PCJOptimizer.class);
- }
-
- if (getUsePcjUpdaterIndex(conf)) {
- indexList.add(PrecomputedJoinIndexer.class.getName());
- }
-
- if (getUseFreeText(conf)) {
- indexList.add(AccumuloFreeTextIndexer.class.getName());
- useFilterIndex = true;
- }
-
- if (getUseTemporal(conf)) {
- indexList.add(AccumuloTemporalIndexer.class.getName());
- useFilterIndex = true;
- }
-
- if (getUseEntity(conf)) {
- indexList.add(EntityCentricIndex.class.getName());
- optimizers.add(EntityOptimizer.class.getName());
- }
- }
-
- if (useFilterIndex) {
- optimizers.add(FilterFunctionOptimizer.class.getName());
- }
-
- if (conf.getUseStatementMetadata()) {
- optimizers.add(StatementMetadataOptimizer.class.getName());
- }
-
-<<<<<<< HEAD
- conf.setStrings(AccumuloRdfConfiguration.CONF_ADDITIONAL_INDEXERS, indexList.toArray(new String[] {}));
- conf.setStrings(RdfCloudTripleStoreConfiguration.CONF_OPTIMIZERS, optimizers.toArray(new String[] {}));
- }
-}
-=======
- final String[] existingIndexers = conf.getStrings(AccumuloRdfConfiguration.CONF_ADDITIONAL_INDEXERS);
- if(existingIndexers != null ) {
- for(final String idx : existingIndexers) {
- indexList.add(idx);
- }
- }
-
- final String[] existingOptimizers = conf.getStrings(RdfCloudTripleStoreConfiguration.CONF_OPTIMIZERS);
- if(existingOptimizers != null ) {
- for(final String opt : existingOptimizers) {
- optimizers.add(opt);
- }
- }
-
- conf.setStrings(AccumuloRdfConfiguration.CONF_ADDITIONAL_INDEXERS, indexList.toArray(new String[]{}));
- conf.setStrings(RdfCloudTripleStoreConfiguration.CONF_OPTIMIZERS, optimizers.toArray(new String[]{}));
- }
-
-
-
-}
->>>>>>> RYA-236 Changes to other indexers
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c826ffea/extras/indexing/src/main/java/org/apache/rya/indexing/external/PrecomputedJoinIndexerConfig.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/org/apache/rya/indexing/external/PrecomputedJoinIndexerConfig.java b/extras/indexing/src/main/java/org/apache/rya/indexing/external/PrecomputedJoinIndexerConfig.java
index d73e180..4faf4a0 100644
--- a/extras/indexing/src/main/java/org/apache/rya/indexing/external/PrecomputedJoinIndexerConfig.java
+++ b/extras/indexing/src/main/java/org/apache/rya/indexing/external/PrecomputedJoinIndexerConfig.java
@@ -47,7 +47,11 @@ public class PrecomputedJoinIndexerConfig {
/**
* Stores each PCJ within an Accumulo table.
*/
- ACCUMULO;
+ ACCUMULO,
+ /**
+ * Stores each PCJ within a MongoDB collection.
+ */
+ MONGO;
}
/**
@@ -106,8 +110,6 @@ public class PrecomputedJoinIndexerConfig {
return Optional.fromNullable(updaterType);
}
-
-
public boolean getUseFluoUpdater() {
return config.getBoolean(ConfigUtils.USE_PCJ_UPDATER_INDEX, false);
}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c826ffea/extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/pcj/MongoPcjIndexSetProvider.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/pcj/MongoPcjIndexSetProvider.java b/extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/pcj/MongoPcjIndexSetProvider.java
new file mode 100644
index 0000000..d3fa07e
--- /dev/null
+++ b/extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/pcj/MongoPcjIndexSetProvider.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.mongodb.pcj;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.rya.api.instance.RyaDetailsRepository;
+import org.apache.rya.api.instance.RyaDetailsRepository.RyaDetailsRepositoryException;
+import org.apache.rya.indexing.external.tupleSet.ExternalTupleSet;
+import org.apache.rya.indexing.pcj.matching.provider.AbstractPcjIndexSetProvider;
+import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
+import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.PCJStorageException;
+import org.apache.rya.indexing.pcj.storage.mongo.MongoPcjDocuments;
+import org.apache.rya.indexing.pcj.storage.mongo.MongoPcjStorage;
+import org.apache.rya.mongodb.StatefulMongoDBRdfConfiguration;
+import org.apache.rya.mongodb.instance.MongoRyaInstanceDetailsRepository;
+import org.openrdf.query.MalformedQueryException;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.mongodb.MongoClient;
+
+/**
+ * Implementation of {@link AbstractPcjIndexSetProvider} for MongoDB.
+ */
+public class MongoPcjIndexSetProvider extends AbstractPcjIndexSetProvider {
+ /**
+ * Creates a new {@link MongoPcjIndexSetProvider}.
+ * @param conf - The configuration for this provider. (not null)
+ */
+ public MongoPcjIndexSetProvider(final StatefulMongoDBRdfConfiguration conf) {
+ super(conf);
+ }
+
+ /**
+ * Creates a new {@link MongoPcjIndexSetProvider}.
+ * @param conf - The configuration for this provider.
+ * @param indices - The predefined indicies on this provider.
+ * @param client - The {@link MongoClient} used to connect to mongo.
+ */
+ public MongoPcjIndexSetProvider(final StatefulMongoDBRdfConfiguration conf, final List<ExternalTupleSet> indices) {
+ super(conf, indices);
+ }
+
+ @Override
+ protected List<ExternalTupleSet> getIndices() throws PcjIndexSetException {
+ try {
+ final StatefulMongoDBRdfConfiguration mongoConf = (StatefulMongoDBRdfConfiguration) conf;
+ final MongoClient client = mongoConf.getMongoClient();
+ final MongoPcjDocuments pcjDocs = new MongoPcjDocuments(client, mongoConf.getRyaInstanceName());
+ List<String> documents = null;
+
+ documents = mongoConf.getPcjTables();
+ // this maps associates pcj document name with pcj sparql query
+ final Map<String, String> indexDocuments = Maps.newLinkedHashMap();
+
+ try(final PrecomputedJoinStorage storage = new MongoPcjStorage(client, mongoConf.getRyaInstanceName())) {
+
+ final boolean docsProvided = documents != null && !documents.isEmpty();
+
+ if (docsProvided) {
+ // if tables provided, associate table name with sparql
+ for (final String doc : documents) {
+ indexDocuments.put(doc, storage.getPcjMetadata(doc).getSparql());
+ }
+ } else if (hasRyaDetails()) {
+ // If this is a newer install of Rya, and it has PCJ Details, then
+ // use those.
+ final List<String> ids = storage.listPcjs();
+ for (final String pcjId : ids) {
+ indexDocuments.put(pcjId, storage.getPcjMetadata(pcjId).getSparql());
+ }
+ } else {
+ // Otherwise figure it out by getting document IDs.
+ documents = pcjDocs.listPcjDocuments();
+ for (final String pcjId : documents) {
+ if (pcjId.startsWith("INDEX")) {
+ indexDocuments.put(pcjId, pcjDocs.getPcjMetadata(pcjId).getSparql());
+ }
+ }
+ }
+ }
+
+ final List<ExternalTupleSet> index = Lists.newArrayList();
+ if (indexDocuments.isEmpty()) {
+ log.info("No Index found");
+ } else {
+ for (final String pcjID : indexDocuments.keySet()) {
+ final String indexSparqlString = indexDocuments.get(pcjID);
+ index.add(new MongoPcjQueryNode(indexSparqlString, pcjID, pcjDocs));
+ }
+ }
+ return index;
+ } catch (final PCJStorageException | MalformedQueryException e) {
+ throw new PcjIndexSetException("Failed to get indicies for this PCJ index.", e);
+ }
+ }
+
+ private boolean hasRyaDetails() {
+ final StatefulMongoDBRdfConfiguration mongoConf = (StatefulMongoDBRdfConfiguration) conf;
+ final RyaDetailsRepository detailsRepo = new MongoRyaInstanceDetailsRepository(mongoConf.getMongoClient(), mongoConf.getRyaInstanceName());
+ try {
+ detailsRepo.getRyaInstanceDetails();
+ return true;
+ } catch (final RyaDetailsRepositoryException e) {
+ return false;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c826ffea/extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/pcj/MongoPcjQueryNode.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/pcj/MongoPcjQueryNode.java b/extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/pcj/MongoPcjQueryNode.java
new file mode 100644
index 0000000..c03ee99
--- /dev/null
+++ b/extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/pcj/MongoPcjQueryNode.java
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.mongodb.pcj;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.log4j.Logger;
+import org.apache.rya.api.utils.CloseableIterator;
+import org.apache.rya.api.utils.IteratorWrapper;
+import org.apache.rya.indexing.external.tupleSet.ExternalTupleSet;
+import org.apache.rya.indexing.external.tupleSet.ParsedQueryUtil;
+import org.apache.rya.indexing.pcj.matching.PCJOptimizerUtilities;
+import org.apache.rya.indexing.pcj.storage.PcjException;
+import org.apache.rya.indexing.pcj.storage.mongo.MongoPcjDocuments;
+import org.apache.rya.mongodb.StatefulMongoDBRdfConfiguration;
+import org.apache.rya.rdftriplestore.evaluation.ExternalBatchingIterator;
+import org.openrdf.query.BindingSet;
+import org.openrdf.query.MalformedQueryException;
+import org.openrdf.query.QueryEvaluationException;
+import org.openrdf.query.algebra.Projection;
+import org.openrdf.query.algebra.TupleExpr;
+import org.openrdf.query.parser.ParsedTupleQuery;
+import org.openrdf.query.parser.sparql.SPARQLParser;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+import info.aduna.iteration.CloseableIteration;
+
+/**
+ * Indexing Node for PCJs expressions to be inserted into execution plans.
+ */
+@DefaultAnnotation(NonNull.class)
+public class MongoPcjQueryNode extends ExternalTupleSet implements ExternalBatchingIterator {
+ private static final Logger log = Logger.getLogger(MongoPcjQueryNode.class);
+ private final String pcjId;
+ private final MongoPcjDocuments pcjDocs;
+
+ /**
+ * Creates a new {@link MongoPcjQueryNode}.
+ *
+ * @param sparql - sparql query whose results will be stored in PCJ document. (not empty of null)
+ * @param pcjId - name of an existing PCJ. (not empty or null)
+ * @param pcjDocs - {@link MongoPcjDocuments} used to maintain PCJs in mongo. (not null)
+ *
+ * @throws MalformedQueryException - The SPARQL query needs to contain a projection.
+ */
+ public MongoPcjQueryNode(final String sparql, final String pcjId, final MongoPcjDocuments pcjDocs) throws MalformedQueryException {
+ checkArgument(!Strings.isNullOrEmpty(sparql));
+ checkArgument(!Strings.isNullOrEmpty(pcjId));
+ this.pcjDocs = checkNotNull(pcjDocs);
+ this.pcjId = pcjId;
+ final SPARQLParser sp = new SPARQLParser();
+ final ParsedTupleQuery pq = (ParsedTupleQuery) sp.parseQuery(sparql, null);
+ final TupleExpr te = pq.getTupleExpr();
+ Preconditions.checkArgument(PCJOptimizerUtilities.isPCJValid(te), "TupleExpr is an invalid PCJ.");
+
+ final Optional<Projection> projection = new ParsedQueryUtil().findProjection(pq);
+ if (!projection.isPresent()) {
+ throw new MalformedQueryException("SPARQL query '" + sparql + "' does not contain a Projection.");
+ }
+ setProjectionExpr(projection.get());
+ }
+
+ /**
+ * Creates a new {@link MongoPcjQueryNode}.
+ *
+ * @param conf - configuration to use to connect to mongo. (not null)
+ * @param pcjId - name of an existing PCJ. (not empty or null)
+ */
+ public MongoPcjQueryNode(final Configuration conf, final String pcjId) {
+ checkNotNull(conf);
+ checkArgument(conf instanceof StatefulMongoDBRdfConfiguration,
+ "The configuration must be a StatefulMongoDBRdfConfiguration, found: " + conf.getClass().getSimpleName());
+ checkArgument(!Strings.isNullOrEmpty(pcjId));
+ final StatefulMongoDBRdfConfiguration statefulConf = (StatefulMongoDBRdfConfiguration) conf;
+ pcjDocs = new MongoPcjDocuments(statefulConf.getMongoClient(), statefulConf.getRyaInstanceName());
+ this.pcjId = checkNotNull(pcjId);
+ }
+
+ /**
+ * returns size of table for query planning
+ */
+ @Override
+ public double cardinality() {
+ double cardinality = 0;
+ try {
+ cardinality = pcjDocs.getPcjMetadata(pcjId).getCardinality();
+ } catch (final PcjException e) {
+ log.error("The PCJ has not been created, so has no cardinality.", e);
+ }
+ return cardinality;
+ }
+
+ @Override
+ public CloseableIteration<BindingSet, QueryEvaluationException> evaluate(final BindingSet bindingset)
+ throws QueryEvaluationException {
+ return this.evaluate(Collections.singleton(bindingset));
+ }
+
+ @Override
+ public CloseableIteration<BindingSet, QueryEvaluationException> evaluate(final Collection<BindingSet> bindingset)
+ throws QueryEvaluationException {
+
+ if (bindingset.isEmpty()) {
+ return new IteratorWrapper<BindingSet, QueryEvaluationException>(new HashSet<BindingSet>().iterator());
+ }
+ final CloseableIterator<BindingSet> iter = pcjDocs.getResults(pcjId, bindingset);
+ return new CloseableIteration<BindingSet, QueryEvaluationException>() {
+ @Override
+ public boolean hasNext() throws QueryEvaluationException {
+ return iter.hasNext();
+ }
+
+ @Override
+ public BindingSet next() throws QueryEvaluationException {
+ final BindingSet bs = iter.next();
+ return bs;
+ }
+
+ @Override
+ public void remove() throws QueryEvaluationException {
+ iter.remove();
+ }
+
+ @Override
+ public void close() throws QueryEvaluationException {
+ try {
+ iter.close();
+ } catch (final Exception e) {
+ throw new QueryEvaluationException(e.getMessage(), e);
+ }
+ }
+ };
+ }
+
+ @Override
+ public String getSignature() {
+ return "(Mongo PcjQueryNode) " + Joiner.on(", ").join(super.getTupleExpr().getProjectionElemList().getElements()).replaceAll("\\s+", " ");
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c826ffea/extras/indexing/src/main/java/org/apache/rya/indexing/pcj/matching/AccumuloIndexSetProvider.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/org/apache/rya/indexing/pcj/matching/AccumuloIndexSetProvider.java b/extras/indexing/src/main/java/org/apache/rya/indexing/pcj/matching/AccumuloIndexSetProvider.java
deleted file mode 100644
index 4a15665..0000000
--- a/extras/indexing/src/main/java/org/apache/rya/indexing/pcj/matching/AccumuloIndexSetProvider.java
+++ /dev/null
@@ -1,221 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.rya.indexing.pcj.matching;
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-
-import org.apache.accumulo.core.client.AccumuloException;
-import org.apache.accumulo.core.client.AccumuloSecurityException;
-import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.TableNotFoundException;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.log4j.Logger;
-import org.apache.rya.accumulo.instance.AccumuloRyaInstanceDetailsRepository;
-import org.apache.rya.api.RdfCloudTripleStoreConfiguration;
-import org.apache.rya.api.instance.RyaDetailsRepository;
-import org.apache.rya.api.instance.RyaDetailsRepository.RyaDetailsRepositoryException;
-import org.apache.rya.indexing.IndexPlanValidator.IndexedExecutionPlanGenerator;
-import org.apache.rya.indexing.IndexPlanValidator.ValidIndexCombinationGenerator;
-import org.apache.rya.indexing.accumulo.ConfigUtils;
-import org.apache.rya.indexing.external.matching.ExternalSetProvider;
-import org.apache.rya.indexing.external.matching.QuerySegment;
-import org.apache.rya.indexing.external.tupleSet.AccumuloIndexSet;
-import org.apache.rya.indexing.external.tupleSet.ExternalTupleSet;
-import org.apache.rya.indexing.pcj.storage.PcjException;
-import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
-import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage;
-import org.apache.rya.indexing.pcj.storage.accumulo.PcjTableNameFactory;
-import org.apache.rya.indexing.pcj.storage.accumulo.PcjTables;
-import org.openrdf.query.MalformedQueryException;
-import org.openrdf.query.QueryEvaluationException;
-import org.openrdf.query.algebra.TupleExpr;
-import org.openrdf.sail.SailException;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-
-/**
- * Implementation of {@link ExternalSetProvider} that provides {@link ExternalTupleSet}s.
- * This provider uses either user specified Accumulo configuration information or user a specified
- * List of ExternalTupleSets to populate an internal cache of ExternalTupleSets. If Accumulo configuration
- * is provided, the provider connects to an instance of RyaDetails and populates the cache with
- * PCJs registered in RyaDetails.
- *
- */
-public class AccumuloIndexSetProvider implements ExternalSetProvider<ExternalTupleSet> {
-
- private static final Logger log = Logger.getLogger(ExternalSetProvider.class);
- private static final PCJToSegmentConverter converter = new PCJToSegmentConverter();
- private List<ExternalTupleSet> indexCache;
- private final Configuration conf;
- private boolean init = false;
-
- public AccumuloIndexSetProvider(final Configuration conf) {
- this.conf = Objects.requireNonNull(conf);
- }
-
- public AccumuloIndexSetProvider(final Configuration conf, final List<ExternalTupleSet> indices) {
- this(conf);
- indexCache = indices;
- init = true;
- }
-
- /**
- *
- * @return - size of underlying PCJ cache
- * @throws Exception
- */
- public int size() throws Exception {
- if(!init) {
- indexCache = PCJOptimizerUtilities.getValidPCJs(getAccIndices());
- init = true;
- }
- return indexCache.size();
- }
-
- /**
- * @param segment - QuerySegment used to get relevant queries form index cache for matching
- * @return List of PCJs for matching
- */
- @Override
- public List<ExternalTupleSet> getExternalSets(final QuerySegment<ExternalTupleSet> segment) {
- try {
- if(!init) {
- indexCache = PCJOptimizerUtilities.getValidPCJs(getAccIndices());
- init = true;
- }
- final TupleExpr query = segment.getQuery().getTupleExpr();
- final IndexedExecutionPlanGenerator iep = new IndexedExecutionPlanGenerator(query, indexCache);
- final List<ExternalTupleSet> pcjs = iep.getNormalizedIndices();
- final List<ExternalTupleSet> tuples = new ArrayList<>();
- for (final ExternalTupleSet tuple: pcjs) {
- final QuerySegment<ExternalTupleSet> pcj = converter.setToSegment(tuple);
- if (segment.containsQuerySegment(pcj)) {
- tuples.add(tuple);
- }
- }
- return tuples;
-
- } catch (final Exception e) {
- throw new RuntimeException(e);
- }
- }
-
- /**
- * @param segment - QuerySegment used to get relevant queries form index cache for matching
- *
- * @return Iterator of Lists (combos) of PCJs used to build an optimal query plan
- */
- @Override
- public Iterator<List<ExternalTupleSet>> getExternalSetCombos(final QuerySegment<ExternalTupleSet> segment) {
- final ValidIndexCombinationGenerator comboGen = new ValidIndexCombinationGenerator(segment.getOrderedNodes());
- return comboGen.getValidIndexCombos(getExternalSets(segment));
- }
-
- /**
- *
- *
- * @param conf
- * - client configuration
- *
- * @return - list of {@link ExternalTupleSet}s or PCJs that are either
- * specified by user in Configuration or exist in system.
- *
- * @throws MalformedQueryException
- * @throws SailException
- * @throws QueryEvaluationException
- * @throws TableNotFoundException
- * @throws AccumuloException
- * @throws AccumuloSecurityException
- * @throws PcjException
- */
- private List<ExternalTupleSet> getAccIndices() throws Exception {
-
- Objects.requireNonNull(conf);
- final String tablePrefix = Objects.requireNonNull(conf.get(RdfCloudTripleStoreConfiguration.CONF_TBL_PREFIX));
- final Connector conn = Objects.requireNonNull(ConfigUtils.getConnector(conf));
- List<String> tables = null;
-
- if (conf instanceof RdfCloudTripleStoreConfiguration) {
- tables = ((RdfCloudTripleStoreConfiguration) conf).getPcjTables();
- }
- // this maps associates pcj table name with pcj sparql query
- final Map<String, String> indexTables = Maps.newLinkedHashMap();
-
- try(final PrecomputedJoinStorage storage = new AccumuloPcjStorage(conn, tablePrefix)) {
- final PcjTableNameFactory pcjFactory = new PcjTableNameFactory();
-
- final boolean tablesProvided = tables != null && !tables.isEmpty();
-
- if (tablesProvided) {
- // if tables provided, associate table name with sparql
- for (final String table : tables) {
- indexTables.put(table, storage.getPcjMetadata(pcjFactory.getPcjId(table)).getSparql());
- }
- } else if (hasRyaDetails(tablePrefix, conn)) {
- // If this is a newer install of Rya, and it has PCJ Details, then
- // use those.
- final List<String> ids = storage.listPcjs();
- for (final String id : ids) {
- indexTables.put(pcjFactory.makeTableName(tablePrefix, id), storage.getPcjMetadata(id).getSparql());
- }
- } else {
- // Otherwise figure it out by scanning tables.
- final PcjTables pcjTables = new PcjTables();
- for (final String table : conn.tableOperations().list()) {
- if (table.startsWith(tablePrefix + "INDEX")) {
- indexTables.put(table, pcjTables.getPcjMetadata(conn, table).getSparql());
- }
- }
- }
- }
-
- // use table name sparql map (indexTables) to create {@link
- // AccumuloIndexSet}
- final List<ExternalTupleSet> index = Lists.newArrayList();
- if (indexTables.isEmpty()) {
- log.info("No Index found");
- } else {
- for (final String table : indexTables.keySet()) {
- final String indexSparqlString = indexTables.get(table);
- index.add(new AccumuloIndexSet(indexSparqlString, conf, table));
- }
- }
-
-
- return index;
- }
-
- private static boolean hasRyaDetails(final String ryaInstanceName, final Connector conn) {
- final RyaDetailsRepository detailsRepo = new AccumuloRyaInstanceDetailsRepository(conn, ryaInstanceName);
- try {
- detailsRepo.getRyaInstanceDetails();
- return true;
- } catch (final RyaDetailsRepositoryException e) {
- return false;
- }
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c826ffea/extras/indexing/src/main/java/org/apache/rya/indexing/pcj/matching/PCJOptimizer.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/org/apache/rya/indexing/pcj/matching/PCJOptimizer.java b/extras/indexing/src/main/java/org/apache/rya/indexing/pcj/matching/PCJOptimizer.java
index 75b48b4..8067a85 100644
--- a/extras/indexing/src/main/java/org/apache/rya/indexing/pcj/matching/PCJOptimizer.java
+++ b/extras/indexing/src/main/java/org/apache/rya/indexing/pcj/matching/PCJOptimizer.java
@@ -33,6 +33,10 @@ import org.apache.rya.indexing.external.matching.QueryNodeListRater;
import org.apache.rya.indexing.external.matching.QuerySegment;
import org.apache.rya.indexing.external.matching.TopOfQueryFilterRelocator;
import org.apache.rya.indexing.external.tupleSet.ExternalTupleSet;
+import org.apache.rya.indexing.mongodb.pcj.MongoPcjIndexSetProvider;
+import org.apache.rya.indexing.pcj.matching.provider.AbstractPcjIndexSetProvider;
+import org.apache.rya.indexing.pcj.matching.provider.AccumuloIndexSetProvider;
+import org.apache.rya.mongodb.StatefulMongoDBRdfConfiguration;
import org.openrdf.query.BindingSet;
import org.openrdf.query.Dataset;
import org.openrdf.query.algebra.QueryModelNode;
@@ -58,29 +62,32 @@ import com.google.common.base.Optional;;
*/
public class PCJOptimizer extends AbstractExternalSetOptimizer<ExternalTupleSet>implements Configurable {
private static final PCJExternalSetMatcherFactory factory = new PCJExternalSetMatcherFactory();
- private AccumuloIndexSetProvider provider;
+ private AbstractPcjIndexSetProvider provider;
private Configuration conf;
private boolean init = false;
public PCJOptimizer() {}
public PCJOptimizer(final Configuration conf) {
- setConf(conf);
+ setConf(conf);
}
/**
* This constructor is designed to be used for testing. A more typical use
* pattern is for a user to specify Accumulo connection details in a Configuration
* file so that PCJs can be retrieved by an AccumuloIndexSetProvider.
- *
- * @param indices - user specified PCJs to match to query
+ *
+ * @param indices - user specified PCJs to match to query. (not null)
* @param useOptimalPcj - optimize PCJ combos for matching
+ * @param provider - The provider to use in this optimizer. (not null)
*/
- public PCJOptimizer(final List<ExternalTupleSet> indices, final boolean useOptimalPcj) {
+ public PCJOptimizer(final List<ExternalTupleSet> indices, final boolean useOptimalPcj,
+ final AbstractPcjIndexSetProvider provider) {
checkNotNull(indices);
+ checkNotNull(provider);
conf = new Configuration();
- this.useOptimal = useOptimalPcj;
- provider = new AccumuloIndexSetProvider(conf, indices);
+ useOptimal = useOptimalPcj;
+ this.provider = provider;
init = true;
}
@@ -90,9 +97,14 @@ public class PCJOptimizer extends AbstractExternalSetOptimizer<ExternalTupleSet>
if (!init) {
try {
this.conf = conf;
- this.useOptimal = ConfigUtils.getUseOptimalPCJ(conf);
- provider = new AccumuloIndexSetProvider(conf);
- } catch (Exception e) {
+ useOptimal = ConfigUtils.getUseOptimalPCJ(conf);
+ if (conf instanceof StatefulMongoDBRdfConfiguration) {
+ final StatefulMongoDBRdfConfiguration mongoConf = (StatefulMongoDBRdfConfiguration) conf;
+ provider = new MongoPcjIndexSetProvider(mongoConf);
+ } else {
+ provider = new AccumuloIndexSetProvider(conf);
+ }
+ } catch (final Exception e) {
throw new Error(e);
}
init = true;
@@ -124,14 +136,14 @@ public class PCJOptimizer extends AbstractExternalSetOptimizer<ExternalTupleSet>
} else {
return;
}
- } catch (Exception e) {
- throw new RuntimeException("Could not populate Accumulo Index Cache.");
+ } catch (final Exception e) {
+ throw new RuntimeException("Could not populate Index Cache.", e);
}
}
@Override
- protected ExternalSetMatcher<ExternalTupleSet> getMatcher(QuerySegment<ExternalTupleSet> segment) {
+ protected ExternalSetMatcher<ExternalTupleSet> getMatcher(final QuerySegment<ExternalTupleSet> segment) {
return factory.getMatcher(segment);
}
@@ -141,7 +153,7 @@ public class PCJOptimizer extends AbstractExternalSetOptimizer<ExternalTupleSet>
}
@Override
- protected Optional<QueryNodeListRater> getNodeListRater(QuerySegment<ExternalTupleSet> segment) {
+ protected Optional<QueryNodeListRater> getNodeListRater(final QuerySegment<ExternalTupleSet> segment) {
return Optional.of(new BasicRater(segment.getOrderedNodes()));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c826ffea/extras/indexing/src/main/java/org/apache/rya/indexing/pcj/matching/PCJOptimizerUtilities.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/org/apache/rya/indexing/pcj/matching/PCJOptimizerUtilities.java b/extras/indexing/src/main/java/org/apache/rya/indexing/pcj/matching/PCJOptimizerUtilities.java
index 1ad03b6..09a2706 100644
--- a/extras/indexing/src/main/java/org/apache/rya/indexing/pcj/matching/PCJOptimizerUtilities.java
+++ b/extras/indexing/src/main/java/org/apache/rya/indexing/pcj/matching/PCJOptimizerUtilities.java
@@ -27,7 +27,6 @@ import java.util.Set;
import org.apache.rya.indexing.external.matching.QuerySegment;
import org.apache.rya.indexing.external.tupleSet.ExternalTupleSet;
import org.apache.rya.indexing.pcj.matching.QueryVariableNormalizer.VarCollector;
-
import org.openrdf.query.algebra.Difference;
import org.openrdf.query.algebra.EmptySet;
import org.openrdf.query.algebra.Filter;
@@ -61,7 +60,7 @@ public class PCJOptimizerUtilities {
* @param node - node to be checked for validity
* @return - true if valid and false otherwise
*/
- public static boolean isPCJValid(TupleExpr node) {
+ public static boolean isPCJValid(final TupleExpr node) {
final ValidQueryVisitor vqv = new ValidQueryVisitor();
node.visit(vqv);
@@ -91,16 +90,16 @@ public class PCJOptimizerUtilities {
* @param node - PCJ {@link ExternalTupleSet} index node to be checked for validity
* @return - true if valid and false otherwise
*/
- public static boolean isPCJValid(ExternalTupleSet node) {
+ public static boolean isPCJValid(final ExternalTupleSet node) {
return isPCJValid(node.getTupleExpr());
}
public static List<ExternalTupleSet> getValidPCJs(
- List<ExternalTupleSet> pcjs) {
+ final List<ExternalTupleSet> pcjs) {
- Iterator<ExternalTupleSet> iterator = pcjs.iterator();
+ final Iterator<ExternalTupleSet> iterator = pcjs.iterator();
while (iterator.hasNext()) {
- ExternalTupleSet pcj = iterator.next();
+ final ExternalTupleSet pcj = iterator.next();
if (!isPCJValid(pcj)) {
iterator.remove();
}
@@ -109,8 +108,8 @@ public class PCJOptimizerUtilities {
}
- public static Projection getProjection(TupleExpr te) {
- ProjectionVisitor visitor = new ProjectionVisitor();
+ public static Projection getProjection(final TupleExpr te) {
+ final ProjectionVisitor visitor = new ProjectionVisitor();
te.visit(visitor);
return visitor.node;
}
@@ -120,7 +119,7 @@ public class PCJOptimizerUtilities {
Projection node = null;
@Override
- public void meet(Projection node) {
+ public void meet(final Projection node) {
this.node = node;
}
}
@@ -131,13 +130,13 @@ public class PCJOptimizerUtilities {
* - filters to be pushed down into next {@link QuerySegment}, or
* as far down as binding variable names permit.
*/
- public static void relocateFilters(Set<Filter> filters) {
- for (Filter filter : filters) {
+ public static void relocateFilters(final Set<Filter> filters) {
+ for (final Filter filter : filters) {
FilterRelocator.relocate(filter);
}
}
- private static Set<String> getVarNames(Collection<QueryModelNode> nodes) {
+ private static Set<String> getVarNames(final Collection<QueryModelNode> nodes) {
List<String> tempVars;
final Set<String> nodeVarNames = Sets.newHashSet();
@@ -159,8 +158,8 @@ public class PCJOptimizerUtilities {
QueryModelVisitorBase<RuntimeException> {
private boolean isValid = true;
- private Set<QueryModelNode> filterSet = Sets.newHashSet();
- private Set<QueryModelNode> spSet = Sets.newHashSet();
+ private final Set<QueryModelNode> filterSet = Sets.newHashSet();
+ private final Set<QueryModelNode> spSet = Sets.newHashSet();
private int joinCount = 0;
public Set<QueryModelNode> getFilters() {
@@ -180,35 +179,35 @@ public class PCJOptimizerUtilities {
}
@Override
- public void meet(Projection node) {
+ public void meet(final Projection node) {
node.getArg().visit(this);
}
@Override
- public void meet(Filter node) {
+ public void meet(final Filter node) {
filterSet.add(node.getCondition());
node.getArg().visit(this);
}
@Override
- public void meet(StatementPattern node) {
+ public void meet(final StatementPattern node) {
spSet.add(node);
}
@Override
- public void meet(Join node) {
+ public void meet(final Join node) {
joinCount++;
super.meet(node);
}
@Override
- public void meet(LeftJoin node) {
+ public void meet(final LeftJoin node) {
joinCount++;
super.meet(node);
}
@Override
- public void meetNode(QueryModelNode node) {
+ public void meetNode(final QueryModelNode node) {
if (!(node instanceof Join || node instanceof LeftJoin
|| node instanceof StatementPattern || node instanceof Var
|| node instanceof Union || node instanceof Filter || node instanceof Projection)) {
@@ -238,18 +237,18 @@ public class PCJOptimizerUtilities {
protected Filter filter;
protected Set<String> filterVars;
- public FilterRelocator(Filter filter) {
+ public FilterRelocator(final Filter filter) {
this.filter = filter;
filterVars = VarNameCollector.process(filter.getCondition());
}
- public static void relocate(Filter filter) {
+ public static void relocate(final Filter filter) {
final FilterRelocator fr = new FilterRelocator(filter);
filter.visit(fr);
}
@Override
- protected void meetNode(QueryModelNode node) {
+ protected void meetNode(final QueryModelNode node) {
// By default, do not traverse
assert node instanceof TupleExpr;
@@ -263,7 +262,7 @@ public class PCJOptimizerUtilities {
}
@Override
- public void meet(Join join) {
+ public void meet(final Join join) {
if (join.getRightArg().getBindingNames().containsAll(filterVars)) {
// All required vars are bound by the left expr
join.getRightArg().visit(this);
@@ -277,12 +276,12 @@ public class PCJOptimizerUtilities {
}
@Override
- public void meet(Filter node) {
+ public void meet(final Filter node) {
node.getArg().visit(this);
}
@Override
- public void meet(LeftJoin leftJoin) {
+ public void meet(final LeftJoin leftJoin) {
if (leftJoin.getLeftArg().getBindingNames().containsAll(filterVars)) {
leftJoin.getLeftArg().visit(this);
} else {
@@ -291,7 +290,7 @@ public class PCJOptimizerUtilities {
}
@Override
- public void meet(Union union) {
+ public void meet(final Union union) {
boolean filterMoved = false;
if (Sets.intersection(union.getRightArg().getBindingNames(), filterVars).size() > 0) {
relocate(filter, union.getRightArg());
@@ -300,7 +299,7 @@ public class PCJOptimizerUtilities {
if (Sets.intersection(union.getLeftArg().getBindingNames(), filterVars).size() > 0) {
if (filterMoved) {
- Filter clone = new Filter(filter.getArg(), filter.getCondition().clone());
+ final Filter clone = new Filter(filter.getArg(), filter.getCondition().clone());
relocate(clone, union.getLeftArg());
} else {
relocate(filter, union.getLeftArg());
@@ -309,36 +308,36 @@ public class PCJOptimizerUtilities {
}
@Override
- public void meet(Difference node) {
+ public void meet(final Difference node) {
if (Sets.intersection(node.getRightArg().getBindingNames(), filterVars).size() > 0) {
relocate(filter, node.getRightArg());
} else if (Sets.intersection(node.getLeftArg().getBindingNames(), filterVars).size() > 0) {
- Filter clone = new Filter(filter.getArg(), filter
+ final Filter clone = new Filter(filter.getArg(), filter
.getCondition().clone());
relocate(clone, node.getLeftArg());
}
}
@Override
- public void meet(Intersection node) {
+ public void meet(final Intersection node) {
if (Sets.intersection(node.getRightArg().getBindingNames(), filterVars).size() > 0) {
relocate(filter, node.getRightArg());
} else if (Sets.intersection(node.getLeftArg().getBindingNames(), filterVars).size() > 0) {
- Filter clone = new Filter(filter.getArg(), filter
+ final Filter clone = new Filter(filter.getArg(), filter
.getCondition().clone());
relocate(clone, node.getLeftArg());
}
}
@Override
- public void meet(EmptySet node) {
+ public void meet(final EmptySet node) {
if (filter.getParentNode() != null) {
// Remove filter from its original location
filter.replaceWith(filter.getArg());
}
}
- protected void relocate(Filter filter, TupleExpr newFilterArg) {
+ protected void relocate(final Filter filter, final TupleExpr newFilterArg) {
if (!filter.getArg().equals(newFilterArg)) {
if (filter.getParentNode() != null) {
// Remove filter from its original location
@@ -351,10 +350,8 @@ public class PCJOptimizerUtilities {
}
}
-
-
- public static boolean tupleContainsLeftJoins(TupleExpr node) {
- LeftJoinVisitor lj = new LeftJoinVisitor();
+ public static boolean tupleContainsLeftJoins(final TupleExpr node) {
+ final LeftJoinVisitor lj = new LeftJoinVisitor();
node.visit(lj);
return lj.containsLeftJoin;
}
@@ -368,18 +365,8 @@ public class PCJOptimizerUtilities {
}
@Override
- public void meet(LeftJoin node) {
+ public void meet(final LeftJoin node) {
containsLeftJoin = true;
}
}
-
-
-
-
-
-
-
-
-
-
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c826ffea/extras/indexing/src/main/java/org/apache/rya/indexing/pcj/matching/provider/AbstractPcjIndexSetProvider.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/org/apache/rya/indexing/pcj/matching/provider/AbstractPcjIndexSetProvider.java b/extras/indexing/src/main/java/org/apache/rya/indexing/pcj/matching/provider/AbstractPcjIndexSetProvider.java
new file mode 100644
index 0000000..984153a
--- /dev/null
+++ b/extras/indexing/src/main/java/org/apache/rya/indexing/pcj/matching/provider/AbstractPcjIndexSetProvider.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.matching.provider;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.log4j.Logger;
+import org.apache.rya.api.instance.RyaDetails;
+import org.apache.rya.indexing.IndexPlanValidator.IndexedExecutionPlanGenerator;
+import org.apache.rya.indexing.IndexPlanValidator.ValidIndexCombinationGenerator;
+import org.apache.rya.indexing.external.matching.ExternalSetProvider;
+import org.apache.rya.indexing.external.matching.QuerySegment;
+import org.apache.rya.indexing.external.tupleSet.ExternalTupleSet;
+import org.apache.rya.indexing.pcj.matching.PCJOptimizerUtilities;
+import org.apache.rya.indexing.pcj.matching.PCJToSegmentConverter;
+import org.openrdf.query.algebra.TupleExpr;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * Abstraction of {@link ExternalSetProvider} that provides {@link ExternalTupleSet}s.
+ * Implementations of this use either a user specified configuration information or user a specified
+ * List of ExternalTupleSets to populate an internal cache of ExternalTupleSets. If a configuration
+ * is provided, the provider connects to an instance of RyaDetails and populates the cache with
+ * PCJs registered in RyaDetails.
+ */
+public abstract class AbstractPcjIndexSetProvider implements ExternalSetProvider<ExternalTupleSet> {
+ protected static final Logger log = Logger.getLogger(AbstractPcjIndexSetProvider.class);
+ protected static final PCJToSegmentConverter converter = new PCJToSegmentConverter();
+ protected List<ExternalTupleSet> indexCache;
+ protected final Configuration conf;
+ protected boolean init = false;
+
+ /**
+ * Creates a new {@link AbstractPcjIndexSetProvider} based on configuration only.
+ * @param conf - The {@link Configuration} used to connect to {@link RyaDetails}.
+ */
+ public AbstractPcjIndexSetProvider(final Configuration conf) {
+ this.conf = requireNonNull(conf);
+ }
+
+ /**
+ * Creates a new {@link AbstractPcjIndexSetProvider} based user provided {@link ExternalTupleSet}s.
+ * @param conf - The {@link Configuration} used to connect to {@link RyaDetails}.
+ * @param indices - The {@link ExternalTupleSet}s to populate the internal cache.
+ */
+ public AbstractPcjIndexSetProvider(final Configuration conf, final List<ExternalTupleSet> indices) {
+ requireNonNull(conf);
+ this.conf = conf;
+ indexCache = indices;
+ init = true;
+ }
+
+
+ /**
+ *
+ * @param indices
+ */
+ @VisibleForTesting
+ public void setIndices(final List<ExternalTupleSet> indices) {
+ indexCache = indices;
+ init = true;
+ }
+
+ /**
+ * @param segment - QuerySegment used to get relevant queries form index cache for matching
+ *
+ * @return Iterator of Lists (combos) of PCJs used to build an optimal query plan
+ */
+ @Override
+ public Iterator<List<ExternalTupleSet>> getExternalSetCombos(final QuerySegment<ExternalTupleSet> segment) {
+ final ValidIndexCombinationGenerator comboGen = new ValidIndexCombinationGenerator(segment.getOrderedNodes());
+ return comboGen.getValidIndexCombos(getExternalSets(segment));
+ }
+
+ /**
+ * @param segment - QuerySegment used to get relevant queries form index cache for matching
+ * @return List of PCJs for matching
+ */
+ @Override
+ public List<ExternalTupleSet> getExternalSets(final QuerySegment<ExternalTupleSet> segment) {
+ try {
+ if(!init) {
+ indexCache = PCJOptimizerUtilities.getValidPCJs(getIndices());
+ init = true;
+ }
+ final TupleExpr query = segment.getQuery().getTupleExpr();
+ final IndexedExecutionPlanGenerator iep = new IndexedExecutionPlanGenerator(query, indexCache);
+ final List<ExternalTupleSet> pcjs = iep.getNormalizedIndices();
+ final List<ExternalTupleSet> tuples = new ArrayList<>();
+ for (final ExternalTupleSet tuple: pcjs) {
+ final QuerySegment<ExternalTupleSet> pcj = converter.setToSegment(tuple);
+ if (segment.containsQuerySegment(pcj)) {
+ tuples.add(tuple);
+ }
+ }
+ return tuples;
+
+ } catch (final Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * @return The size of the set index cache.
+ * @throws Exception
+ */
+ public int size() throws Exception {
+ if (!init) {
+ indexCache = PCJOptimizerUtilities.getValidPCJs(getIndices());
+ init = true;
+ }
+ return indexCache.size();
+ }
+
+ /**
+ * @param conf - client configuration
+ * @return - list of {@link ExternalTupleSet}s or PCJs that are either
+ * specified by user in Configuration or exist in system.
+ *
+ * @throws Exception
+ */
+ protected abstract List<ExternalTupleSet> getIndices() throws PcjIndexSetException;
+
+ /**
+ * Exception thrown when failing to get the defined PCJS for a particular
+ * index.
+ */
+ public class PcjIndexSetException extends Exception {
+ public PcjIndexSetException(final String message) {
+ super(message);
+ }
+
+ public PcjIndexSetException(final String message, final Throwable cause) {
+ super(message, cause);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c826ffea/extras/indexing/src/main/java/org/apache/rya/indexing/pcj/matching/provider/AccumuloIndexSetProvider.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/org/apache/rya/indexing/pcj/matching/provider/AccumuloIndexSetProvider.java b/extras/indexing/src/main/java/org/apache/rya/indexing/pcj/matching/provider/AccumuloIndexSetProvider.java
new file mode 100644
index 0000000..1fa3677
--- /dev/null
+++ b/extras/indexing/src/main/java/org/apache/rya/indexing/pcj/matching/provider/AccumuloIndexSetProvider.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.matching.provider;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.log4j.Logger;
+import org.apache.rya.accumulo.instance.AccumuloRyaInstanceDetailsRepository;
+import org.apache.rya.api.RdfCloudTripleStoreConfiguration;
+import org.apache.rya.api.instance.RyaDetailsRepository;
+import org.apache.rya.api.instance.RyaDetailsRepository.RyaDetailsRepositoryException;
+import org.apache.rya.indexing.accumulo.ConfigUtils;
+import org.apache.rya.indexing.external.tupleSet.AccumuloIndexSet;
+import org.apache.rya.indexing.external.tupleSet.ExternalTupleSet;
+import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
+import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.PCJStorageException;
+import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage;
+import org.apache.rya.indexing.pcj.storage.accumulo.PcjTableNameFactory;
+import org.apache.rya.indexing.pcj.storage.accumulo.PcjTables;
+import org.openrdf.query.MalformedQueryException;
+import org.openrdf.query.QueryEvaluationException;
+import org.openrdf.sail.SailException;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+/**
+ * Implementation of {@link AbstractPcjIndexSetProvider} for Accumulo.
+ * This provider uses either user specified Accumulo configuration information or user a specified
+ * List of ExternalTupleSets to populate an internal cache of ExternalTupleSets. If Accumulo configuration
+ * is provided, the provider connects to an instance of RyaDetails and populates the cache with
+ * PCJs registered in RyaDetails.
+ */
+public class AccumuloIndexSetProvider extends AbstractPcjIndexSetProvider {
+ private static final Logger log = Logger.getLogger(AccumuloIndexSetProvider.class);
+
+ public AccumuloIndexSetProvider(final Configuration conf) {
+ super(conf);
+ }
+
+ public AccumuloIndexSetProvider(final Configuration conf, final List<ExternalTupleSet> indices) {
+ super(conf, indices);
+ }
+
+ @Override
+ protected List<ExternalTupleSet> getIndices() throws PcjIndexSetException {
+ requireNonNull(conf);
+ try {
+ final String tablePrefix = requireNonNull(conf.get(RdfCloudTripleStoreConfiguration.CONF_TBL_PREFIX));
+ final Connector conn = requireNonNull(ConfigUtils.getConnector(conf));
+ List<String> tables = null;
+
+ if (conf instanceof RdfCloudTripleStoreConfiguration) {
+ tables = ((RdfCloudTripleStoreConfiguration) conf).getPcjTables();
+ }
+ // this maps associates pcj table name with pcj sparql query
+ final Map<String, String> indexTables = Maps.newLinkedHashMap();
+
+ try(final PrecomputedJoinStorage storage = new AccumuloPcjStorage(conn, tablePrefix)) {
+ final PcjTableNameFactory pcjFactory = new PcjTableNameFactory();
+
+ final boolean tablesProvided = tables != null && !tables.isEmpty();
+
+ if (tablesProvided) {
+ // if tables provided, associate table name with sparql
+ for (final String table : tables) {
+ indexTables.put(table, storage.getPcjMetadata(pcjFactory.getPcjId(table)).getSparql());
+ }
+ } else if (hasRyaDetails(tablePrefix, conn)) {
+ // If this is a newer install of Rya, and it has PCJ Details, then
+ // use those.
+ final List<String> ids = storage.listPcjs();
+ for (final String id : ids) {
+ indexTables.put(pcjFactory.makeTableName(tablePrefix, id), storage.getPcjMetadata(id).getSparql());
+ }
+ } else {
+ // Otherwise figure it out by scanning tables.
+ final PcjTables pcjTables = new PcjTables();
+ for (final String table : conn.tableOperations().list()) {
+ if (table.startsWith(tablePrefix + "INDEX")) {
+ indexTables.put(table, pcjTables.getPcjMetadata(conn, table).getSparql());
+ }
+ }
+ }
+ }
+
+ // use table name sparql map (indexTables) to create {@link
+ // AccumuloIndexSet}
+ final List<ExternalTupleSet> index = Lists.newArrayList();
+ if (indexTables.isEmpty()) {
+ log.info("No Index found");
+ } else {
+ for (final String table : indexTables.keySet()) {
+ final String indexSparqlString = indexTables.get(table);
+ index.add(new AccumuloIndexSet(indexSparqlString, conf, table));
+ }
+ }
+
+ return index;
+ } catch (final PCJStorageException | AccumuloException | AccumuloSecurityException | MalformedQueryException
+ | SailException | QueryEvaluationException | TableNotFoundException e) {
+ throw new PcjIndexSetException("Failed to retrieve the indicies.", e);
+ }
+ }
+
+ private static boolean hasRyaDetails(final String ryaInstanceName, final Connector conn) {
+ final RyaDetailsRepository detailsRepo = new AccumuloRyaInstanceDetailsRepository(conn, ryaInstanceName);
+ try {
+ detailsRepo.getRyaInstanceDetails();
+ return true;
+ } catch (final RyaDetailsRepositoryException e) {
+ return false;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c826ffea/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/AccumuloBatchUpdatePCJIT.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/AccumuloBatchUpdatePCJIT.java b/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/AccumuloBatchUpdatePCJIT.java
index 5028454..78b4f52 100644
--- a/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/AccumuloBatchUpdatePCJIT.java
+++ b/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/AccumuloBatchUpdatePCJIT.java
@@ -107,7 +107,7 @@ public class AccumuloBatchUpdatePCJIT extends AccumuloITBase {
final String pcjId = pcjStorage.createPcj(sparql);
// Run the test.
- ryaClient.getBatchUpdatePCJ().get().batchUpdate(RYA_INSTANCE_NAME, pcjId);
+ ryaClient.getBatchUpdatePCJ().batchUpdate(RYA_INSTANCE_NAME, pcjId);
// Verify the correct results were loaded into the PCJ table.
final Set<BindingSet> expectedResults = new HashSet<>();
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c826ffea/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/AccumuloInstallIT.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/AccumuloInstallIT.java b/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/AccumuloInstallIT.java
index da717d7..a8c7455 100644
--- a/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/AccumuloInstallIT.java
+++ b/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/AccumuloInstallIT.java
@@ -23,10 +23,9 @@ import static org.junit.Assert.assertTrue;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.rya.accumulo.AccumuloITBase;
-import org.apache.rya.api.client.Install;
import org.apache.rya.api.client.Install.DuplicateInstanceNameException;
import org.apache.rya.api.client.Install.InstallConfiguration;
-import org.apache.rya.api.client.InstanceExists;
+import org.apache.rya.api.client.RyaClient;
import org.apache.rya.api.client.RyaClientException;
import org.apache.rya.api.instance.RyaDetailsRepository.NotInitializedException;
import org.apache.rya.api.instance.RyaDetailsRepository.RyaDetailsRepositoryException;
@@ -41,6 +40,14 @@ public class AccumuloInstallIT extends AccumuloITBase {
public void install() throws AccumuloException, AccumuloSecurityException, DuplicateInstanceNameException, RyaClientException, NotInitializedException, RyaDetailsRepositoryException {
// Install an instance of Rya.
final String instanceName = getRyaInstanceName();
+ final AccumuloConnectionDetails connectionDetails = new AccumuloConnectionDetails(
+ getUsername(),
+ getPassword().toCharArray(),
+ getInstanceName(),
+ getZookeepers());
+
+ final RyaClient ryaClient = AccumuloRyaClientFactory.build(connectionDetails, getConnector());
+
final InstallConfiguration installConfig = InstallConfiguration.builder()
.setEnableTableHashPrefix(false)
.setEnableEntityCentricIndex(false)
@@ -48,39 +55,58 @@ public class AccumuloInstallIT extends AccumuloITBase {
.setEnableTemporalIndex(false)
.setEnablePcjIndex(false)
.setEnableGeoIndex(false)
- .setFluoPcjAppName("fluo_app_name")
.build();
+
+ ryaClient.getInstall().install(instanceName, installConfig);
+
+ // Check that the instance exists.
+ assertTrue( ryaClient.getInstanceExists().exists(instanceName) );
+ }
+
+ @Test
+ public void install_withIndexers() throws AccumuloException, AccumuloSecurityException, DuplicateInstanceNameException, RyaClientException, NotInitializedException, RyaDetailsRepositoryException {
+ // Install an instance of Rya.
+ final String instanceName = getRyaInstanceName();
final AccumuloConnectionDetails connectionDetails = new AccumuloConnectionDetails(
- getUsername(),
- getPassword().toCharArray(),
- getInstanceName(),
- getZookeepers());
+ getUsername(),
+ getPassword().toCharArray(),
+ getInstanceName(),
+ getZookeepers());
+
+ final RyaClient ryaClient = AccumuloRyaClientFactory.build(connectionDetails, getConnector());
+
+ final InstallConfiguration installConfig = InstallConfiguration.builder()
+ .setEnableTableHashPrefix(true)
+ .setEnableEntityCentricIndex(true)
+ .setEnableFreeTextIndex(true)
+ .setEnableTemporalIndex(true)
+ .setEnablePcjIndex(true)
+ .setEnableGeoIndex(true)
+ .build();
- final Install install = new AccumuloInstall(connectionDetails, getConnector());
- install.install(instanceName, installConfig);
+ ryaClient.getInstall().install(instanceName, installConfig);
// Check that the instance exists.
- final InstanceExists instanceExists = new AccumuloInstanceExists(connectionDetails, getConnector());
- assertTrue( instanceExists.exists(instanceName) );
+ assertTrue( ryaClient.getInstanceExists().exists(instanceName) );
}
@Test(expected = DuplicateInstanceNameException.class)
public void install_alreadyExists() throws DuplicateInstanceNameException, RyaClientException, AccumuloException, AccumuloSecurityException {
// Install an instance of Rya.
final String instanceName = getRyaInstanceName();
- final InstallConfiguration installConfig = InstallConfiguration.builder().build();
-
final AccumuloConnectionDetails connectionDetails = new AccumuloConnectionDetails(
- getUsername(),
- getPassword().toCharArray(),
- getInstanceName(),
- getZookeepers());
+ getUsername(),
+ getPassword().toCharArray(),
+ getInstanceName(),
+ getZookeepers());
- final Install install = new AccumuloInstall(connectionDetails, getConnector());
- install.install(instanceName, installConfig);
+ final RyaClient ryaClient = AccumuloRyaClientFactory.build(connectionDetails, getConnector());
+
+ final InstallConfiguration installConfig = InstallConfiguration.builder().build();
+ ryaClient.getInstall().install(instanceName, installConfig);
// Install it again.
- install.install(instanceName, installConfig);
+ ryaClient.getInstall().install(instanceName, installConfig);
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c826ffea/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/AccumuloUninstallIT.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/AccumuloUninstallIT.java b/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/AccumuloUninstallIT.java
index c4e4d54..e88e35c 100644
--- a/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/AccumuloUninstallIT.java
+++ b/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/AccumuloUninstallIT.java
@@ -22,11 +22,9 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import org.apache.rya.accumulo.AccumuloITBase;
-import org.apache.rya.api.client.Install;
import org.apache.rya.api.client.Install.InstallConfiguration;
import org.apache.rya.api.client.InstanceDoesNotExistException;
-import org.apache.rya.api.client.InstanceExists;
-import org.apache.rya.api.client.Uninstall;
+import org.apache.rya.api.client.RyaClient;
import org.junit.Test;
/**
@@ -62,20 +60,16 @@ public class AccumuloUninstallIT extends AccumuloITBase {
getPassword().toCharArray(),
getInstanceName(),
getZookeepers());
-
- final Install install = new AccumuloInstall(connectionDetails, getConnector());
- final String ryaInstanceName = getRyaInstanceName();
- install.install(ryaInstanceName, installConfig);
+ final RyaClient ryaClient = AccumuloRyaClientFactory.build(connectionDetails, getConnector());
+ ryaClient.getInstall().install(getRyaInstanceName(), installConfig);
// Check that the instance exists.
- final InstanceExists instanceExists = new AccumuloInstanceExists(connectionDetails, getConnector());
- assertTrue( instanceExists.exists(ryaInstanceName) );
+ assertTrue( ryaClient.getInstanceExists().exists(getRyaInstanceName()) );
// Uninstall the instance of Rya.
- final Uninstall uninstall = new AccumuloUninstall(connectionDetails, getConnector());
- uninstall.uninstall(ryaInstanceName);
+ ryaClient.getUninstall().uninstall(getRyaInstanceName());
// Verify that it no longer exists.
- assertFalse( instanceExists.exists(ryaInstanceName) );
+ assertFalse( ryaClient.getInstanceExists().exists(getRyaInstanceName()) );
}
}
\ No newline at end of file