You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rya.apache.org by pu...@apache.org on 2016/05/13 16:42:48 UTC
[6/6] incubator-rya git commit: RYA-64 - Integrated Rya PCJ Secondary
Index support into core Rya.
RYA-64 - Integrated Rya PCJ Secondary Index support into core Rya.
Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/14073a23
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/14073a23
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/14073a23
Branch: refs/heads/develop
Commit: 14073a23fab3c8c9dca37747d732b83793aee3a3
Parents: 9efae3e
Author: Kevin Chilton <ke...@localhost.localdomain>
Authored: Thu Apr 14 10:34:40 2016 -0400
Committer: Kevin Chilton <ke...@parsons.com>
Committed: Fri May 13 11:26:19 2016 -0400
----------------------------------------------------------------------
extras/indexing/pom.xml | 16 +-
.../accumulo/precompQuery/AccumuloPcjQuery.java | 67 +-
.../mvm/rya/indexing/accumulo/ConfigUtils.java | 226 +++--
.../indexing/accumulo/VisibilityBindingSet.java | 90 --
.../indexing/external/BindingSetDecorator.java | 105 ---
.../indexing/external/PrecompJoinOptimizer.java | 30 +-
.../external/PrecomputedJoinIndexer.java | 255 ++++++
.../external/PrecomputedJoinIndexerConfig.java | 113 +++
.../PrecomputedJoinStorageSupplier.java | 78 ++
.../PrecomputedJoinUpdaterSupplier.java | 78 ++
.../external/accumulo/AccumuloPcjStorage.java | 96 +++
.../accumulo/AccumuloPcjStorageConfig.java | 57 ++
.../accumulo/AccumuloPcjStorageSupplier.java | 77 ++
.../indexing/external/fluo/FluoPcjUpdater.java | 89 ++
.../external/fluo/FluoPcjUpdaterConfig.java | 120 +++
.../external/fluo/FluoPcjUpdaterSupplier.java | 93 +++
.../external/tupleSet/AccumuloIndexSet.java | 43 +-
.../tupleSet/AccumuloPcjSerializer.java | 187 -----
.../external/tupleSet/BindingSetConverter.java | 108 ---
.../tupleSet/BindingSetStringConverter.java | 149 ----
.../indexing/external/tupleSet/PcjTables.java | 833 -------------------
.../VisibilityBindingSetStringConverter.java | 62 --
.../AccumuloConstantPcjIntegrationTest.java | 10 +-
.../external/AccumuloPcjIntegrationTest.java | 16 +-
.../indexing/external/PCJOptionalTestIT.java | 18 +-
.../external/PcjIntegrationTestingUtil.java | 37 +-
.../PrecompJoinOptimizerIntegrationTest.java | 10 +-
.../PrecomputedJoinStorageSupplierTest.java | 79 ++
.../PrecomputedJoinUpdaterSupplierTest.java | 79 ++
.../external/tupleSet/AccumuloIndexSetTest.java | 40 +-
.../tupleSet/AccumuloPcjSerialzerTest.java | 173 ----
.../tupleSet/BindingSetStringConverterTest.java | 310 -------
.../tupleSet/PcjTablesIntegrationTests.java | 438 ----------
.../external/tupleSet/PcjTablesTests.java | 84 --
...VisibilityBindingSetStringConverterTest.java | 132 ---
.../src/main/java/RyaDirectExample.java | 6 +-
extras/pom.xml | 1 +
extras/rya.indexing.pcj/.gitignore | 1 +
extras/rya.indexing.pcj/pom.xml | 84 ++
.../rya/indexing/pcj/storage/PcjException.java | 47 ++
.../rya/indexing/pcj/storage/PcjMetadata.java | 113 +++
.../pcj/storage/PrecomputedJoinStorage.java | 128 +++
.../storage/accumulo/AccumuloPcjSerializer.java | 185 ++++
.../storage/accumulo/BindingSetConverter.java | 106 +++
.../storage/accumulo/BindingSetDecorator.java | 105 +++
.../accumulo/BindingSetStringConverter.java | 148 ++++
.../storage/accumulo/PcjTableNameFactory.java | 73 ++
.../pcj/storage/accumulo/PcjTables.java | 653 +++++++++++++++
.../storage/accumulo/PcjVarOrderFactory.java | 37 +
.../storage/accumulo/ShiftVarOrderFactory.java | 55 ++
.../pcj/storage/accumulo/VariableOrder.java | 117 +++
.../storage/accumulo/VisibilityBindingSet.java | 88 ++
.../VisibilityBindingSetStringConverter.java | 59 ++
.../pcj/update/PrecomputedJoinUpdater.java | 118 +++
.../accumulo/AccumuloPcjSerialzerTest.java | 175 ++++
.../accumulo/BindingSetStringConverterTest.java | 311 +++++++
.../accumulo/PcjTablesIntegrationTests.java | 546 ++++++++++++
.../pcj/storage/accumulo/PcjTablesTests.java | 84 ++
...VisibilityBindingSetStringConverterTest.java | 133 +++
extras/rya.pcj.fluo/pcj.fluo.api/pom.xml | 4 +
.../rya/indexing/pcj/fluo/api/CreatePcj.java | 12 +-
.../indexing/pcj/fluo/api/GetPcjMetadata.java | 6 +-
extras/rya.pcj.fluo/pcj.fluo.app/pom.xml | 8 +-
.../pcj/fluo/app/FilterResultUpdater.java | 8 +-
.../pcj/fluo/app/JoinResultUpdater.java | 10 +-
.../pcj/fluo/app/QueryResultUpdater.java | 8 +-
.../app/export/IncrementalResultExporter.java | 3 +-
.../fluo/app/export/rya/RyaResultExporter.java | 6 +-
.../export/rya/RyaResultExporterFactory.java | 2 +-
.../fluo/app/observers/BindingSetUpdater.java | 4 +-
.../pcj/fluo/app/observers/FilterObserver.java | 6 +-
.../pcj/fluo/app/observers/JoinObserver.java | 6 +-
.../fluo/app/observers/QueryResultObserver.java | 6 +-
.../app/observers/StatementPatternObserver.java | 6 +-
.../pcj/fluo/app/observers/TripleObserver.java | 6 +-
.../pcj/fluo/app/query/CommonNodeMetadata.java | 3 +-
.../pcj/fluo/app/query/FilterMetadata.java | 3 +-
.../fluo/app/query/FluoQueryMetadataDAO.java | 2 +-
.../pcj/fluo/app/query/JoinMetadata.java | 3 +-
.../pcj/fluo/app/query/QueryMetadata.java | 3 +-
.../fluo/app/query/SparqlFluoQueryBuilder.java | 3 +-
.../app/query/StatementPatternMetadata.java | 3 +-
.../pcj/fluo/app/LeftOuterJoinTest.java | 3 +-
.../indexing/pcj/fluo/app/NaturalJoinTest.java | 3 +-
.../fluo/client/command/ListQueriesCommand.java | 2 +-
.../fluo/client/command/NewQueryCommand.java | 2 +-
.../fluo/client/util/ParsedQueryRequest.java | 3 +-
.../fluo/client/util/PcjMetadataRenderer.java | 5 +-
.../pcj/fluo/client/ParsedQueryRequestTest.java | 3 +-
.../fluo/client/PcjMetadataRendererTest.java | 5 +-
.../rya/indexing/pcj/fluo/demo/DemoDriver.java | 14 +-
.../pcj/fluo/demo/FluoAndHistoricPcjsDemo.java | 12 +-
.../apache/rya/indexing/pcj/fluo/ITBase.java | 18 +-
.../indexing/pcj/fluo/api/GetPcjMetadataIT.java | 7 +-
.../indexing/pcj/fluo/api/GetQueryReportIT.java | 4 +-
.../indexing/pcj/fluo/api/ListQueryIdsIT.java | 2 +-
.../fluo/app/query/FluoQueryMetadataDAOIT.java | 2 +-
.../indexing/pcj/fluo/integration/InputIT.java | 2 +-
.../indexing/pcj/fluo/integration/QueryIT.java | 2 +-
.../pcj/fluo/integration/RyaExportIT.java | 12 +-
.../pcj/fluo/visibility/PcjVisibilityIT.java | 20 +-
extras/rya.pcj.fluo/pom.xml | 32 -
pom.xml | 38 +
103 files changed, 4996 insertions(+), 3087 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/14073a23/extras/indexing/pom.xml
----------------------------------------------------------------------
diff --git a/extras/indexing/pom.xml b/extras/indexing/pom.xml
index 53e516d..d819199 100644
--- a/extras/indexing/pom.xml
+++ b/extras/indexing/pom.xml
@@ -76,16 +76,24 @@ under the License.
<artifactId>geomesa-accumulo-datastore</artifactId>
</dependency>
+ <!-- PCJ Indexing -->
+ <dependency>
+ <groupId>org.apache.rya</groupId>
+ <artifactId>rya.indexing.pcj</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.rya</groupId>
+ <artifactId>rya.pcj.fluo.api</artifactId>
+ </dependency>
+
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
-
<dependency>
- <groupId>org.apache.accumulo</groupId>
- <artifactId>accumulo-minicluster</artifactId>
- <version>${accumulo.version}</version>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-all</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/14073a23/extras/indexing/src/main/java/mvm/rya/accumulo/precompQuery/AccumuloPcjQuery.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/mvm/rya/accumulo/precompQuery/AccumuloPcjQuery.java b/extras/indexing/src/main/java/mvm/rya/accumulo/precompQuery/AccumuloPcjQuery.java
index dd1e9c9..f6b7819 100644
--- a/extras/indexing/src/main/java/mvm/rya/accumulo/precompQuery/AccumuloPcjQuery.java
+++ b/extras/indexing/src/main/java/mvm/rya/accumulo/precompQuery/AccumuloPcjQuery.java
@@ -1,5 +1,32 @@
package mvm.rya.accumulo.precompQuery;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.NoSuchElementException;
+import java.util.Set;
+
+import org.apache.accumulo.core.client.BatchScanner;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.hadoop.io.Text;
+import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjSerializer;
+import org.apache.rya.indexing.pcj.storage.accumulo.BindingSetConverter.BindingSetConversionException;
+import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
+import org.openrdf.query.BindingSet;
+import org.openrdf.query.QueryEvaluationException;
+import org.openrdf.query.algebra.evaluation.QueryBindingSet;
+
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@@ -24,38 +51,10 @@ package mvm.rya.accumulo.precompQuery;
*/
import info.aduna.iteration.CloseableIteration;
import info.aduna.iteration.Iteration;
-
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.NoSuchElementException;
-import java.util.Set;
-
import mvm.rya.api.resolver.RyaTypeResolverException;
import mvm.rya.indexing.PcjQuery;
import mvm.rya.indexing.external.tupleSet.AccumuloIndexSet;
-import mvm.rya.indexing.external.tupleSet.AccumuloPcjSerializer;
-import mvm.rya.indexing.external.tupleSet.BindingSetConverter.BindingSetConversionException;
import mvm.rya.indexing.external.tupleSet.ExternalTupleSet;
-import mvm.rya.indexing.external.tupleSet.PcjTables.VariableOrder;
-
-import org.apache.accumulo.core.client.BatchScanner;
-import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.TableNotFoundException;
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.Range;
-import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.security.Authorizations;
-import org.apache.hadoop.io.Text;
-import org.openrdf.query.BindingSet;
-import org.openrdf.query.QueryEvaluationException;
-import org.openrdf.query.algebra.evaluation.QueryBindingSet;
-
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
/**
* This class encapsulates how pre-computed join tables are used during query
@@ -71,10 +70,10 @@ import com.google.common.collect.Sets;
*/
public class AccumuloPcjQuery implements PcjQuery {
private static final AccumuloPcjSerializer converter = new AccumuloPcjSerializer();
-
+
private final Connector accCon;
private final String tableName;
-
+
public AccumuloPcjQuery(Connector accCon, String tableName) {
this.accCon = accCon;
this.tableName = tableName;
@@ -324,7 +323,7 @@ public class AccumuloPcjQuery implements PcjQuery {
bSet.addBinding(var, bs.getBinding(var).getValue());
}
}
-
+
return converter.convert(bSet, new VariableOrder(prefixVars));
}
@@ -333,17 +332,17 @@ public class AccumuloPcjQuery implements PcjQuery {
* @param key - Accumulo key obtained from scan
* @param tableVarMap - map that associated query variables and table variables
* @return - BindingSet without values associated with constant constraints
- * @throws BindingSetConversionException
+ * @throws BindingSetConversionException
*/
private static BindingSet getBindingSetWithoutConstants(Key key,
Map<String, String> tableVarMap) throws BindingSetConversionException {
final byte[] row = key.getRow().getBytes();
final String[] varOrder = key.getColumnFamily().toString()
.split(ExternalTupleSet.VAR_ORDER_DELIM);
-
+
BindingSet bindingSet = converter.convert(row, new VariableOrder(varOrder));
final QueryBindingSet temp = new QueryBindingSet(bindingSet);
-
+
final QueryBindingSet bs = new QueryBindingSet();
for (final String var : temp.getBindingNames()) {
if (!tableVarMap.get(var).startsWith(ExternalTupleSet.CONST_PREFIX)) {
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/14073a23/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/ConfigUtils.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/ConfigUtils.java b/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/ConfigUtils.java
index cf98078..6c87182 100644
--- a/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/ConfigUtils.java
+++ b/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/ConfigUtils.java
@@ -8,9 +8,9 @@ package mvm.rya.indexing.accumulo;
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -25,20 +25,6 @@ import java.util.HashSet;
import java.util.List;
import java.util.Set;
-import mvm.rya.accumulo.AccumuloRdfConfiguration;
-import mvm.rya.api.RdfCloudTripleStoreConfiguration;
-import mvm.rya.indexing.FilterFunctionOptimizer;
-import mvm.rya.indexing.accumulo.entity.EntityCentricIndex;
-import mvm.rya.indexing.accumulo.entity.EntityOptimizer;
-import mvm.rya.indexing.accumulo.freetext.AccumuloFreeTextIndexer;
-import mvm.rya.indexing.accumulo.freetext.LuceneTokenizer;
-import mvm.rya.indexing.accumulo.freetext.Tokenizer;
-import mvm.rya.indexing.accumulo.geo.GeoMesaGeoIndexer;
-import mvm.rya.indexing.accumulo.temporal.AccumuloTemporalIndexer;
-import mvm.rya.indexing.external.PrecompJoinOptimizer;
-import mvm.rya.indexing.mongodb.MongoFreeTextIndexer;
-import mvm.rya.indexing.mongodb.MongoGeoIndexer;
-
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.BatchScanner;
@@ -63,6 +49,20 @@ import org.openrdf.model.impl.URIImpl;
import com.google.common.collect.Lists;
+import mvm.rya.accumulo.AccumuloRdfConfiguration;
+import mvm.rya.api.RdfCloudTripleStoreConfiguration;
+import mvm.rya.indexing.FilterFunctionOptimizer;
+import mvm.rya.indexing.accumulo.entity.EntityCentricIndex;
+import mvm.rya.indexing.accumulo.entity.EntityOptimizer;
+import mvm.rya.indexing.accumulo.freetext.AccumuloFreeTextIndexer;
+import mvm.rya.indexing.accumulo.freetext.LuceneTokenizer;
+import mvm.rya.indexing.accumulo.freetext.Tokenizer;
+import mvm.rya.indexing.accumulo.geo.GeoMesaGeoIndexer;
+import mvm.rya.indexing.accumulo.temporal.AccumuloTemporalIndexer;
+import mvm.rya.indexing.external.PrecompJoinOptimizer;
+import mvm.rya.indexing.mongodb.MongoFreeTextIndexer;
+import mvm.rya.indexing.mongodb.MongoGeoIndexer;
+
/**
* A set of configuration utils to read a Hadoop {@link Configuration} object and create Cloudbase/Accumulo objects.
*/
@@ -88,17 +88,17 @@ public class ConfigUtils {
public static final String GEO_NUM_PARTITIONS = "sc.geo.numPartitions";
public static final String TEMPORAL_TABLENAME = "sc.temporal.index";
public static final String ENTITY_TABLENAME = "sc.entity.index";
-
+
public static final String USE_GEO = "sc.use_geo";
public static final String USE_FREETEXT = "sc.use_freetext";
public static final String USE_TEMPORAL = "sc.use_temporal";
public static final String USE_ENTITY = "sc.use_entity";
public static final String USE_PCJ = "sc.use_pcj";
public static final String USE_OPTIMAL_PCJ = "sc.use.optimal.pcj";
-
+
public static final String USE_INDEXING_SAIL = "sc.use.indexing.sail";
public static final String USE_EXTERNAL_SAIL = "sc.use.external.sail";
-
+
public static final String USE_MOCK_INSTANCE = ".useMockInstance";
public static final String NUM_PARTITIONS = "sc.cloudbase.numPartitions";
@@ -118,22 +118,22 @@ public class ConfigUtils {
public static final String GEO_PREDICATES_LIST = "sc.geo.predicates";
public static final String TEMPORAL_PREDICATES_LIST = "sc.temporal.predicates";
-
+
public static final String USE_MONGO = "sc.useMongo";
- public static boolean isDisplayQueryPlan(Configuration conf){
+ public static boolean isDisplayQueryPlan(final Configuration conf){
return conf.getBoolean(DISPLAY_QUERY_PLAN, false);
}
-
+
/**
* get a value from the configuration file and throw an exception if the value does not exist.
- *
+ *
* @param conf
* @param key
* @return
*/
- private static String getStringCheckSet(Configuration conf, String key) {
- String value = conf.get(key);
+ private static String getStringCheckSet(final Configuration conf, final String key) {
+ final String value = conf.get(key);
Validate.notNull(value, key + " not set");
return value;
}
@@ -146,9 +146,9 @@ public class ConfigUtils {
* @throws AccumuloSecurityException
* @throws TableExistsException
*/
- public static boolean createTableIfNotExists(Configuration conf, String tablename) throws AccumuloException, AccumuloSecurityException,
+ public static boolean createTableIfNotExists(final Configuration conf, final String tablename) throws AccumuloException, AccumuloSecurityException,
TableExistsException {
- TableOperations tops = getConnector(conf).tableOperations();
+ final TableOperations tops = getConnector(conf).tableOperations();
if (!tops.exists(tablename)) {
logger.info("Creating table: " + tablename);
tops.create(tablename);
@@ -156,102 +156,102 @@ public class ConfigUtils {
}
return false;
}
-
- private static String getIndexTableName(Configuration conf, String indexTableNameConf, String altSuffix){
+
+ private static String getIndexTableName(final Configuration conf, final String indexTableNameConf, final String altSuffix){
String value = conf.get(indexTableNameConf);
if (value == null){
- String defaultTableName = conf.get(RdfCloudTripleStoreConfiguration.CONF_TBL_PREFIX);
+ final String defaultTableName = conf.get(RdfCloudTripleStoreConfiguration.CONF_TBL_PREFIX);
Validate.notNull(defaultTableName, indexTableNameConf + " not set and " + RdfCloudTripleStoreConfiguration.CONF_TBL_PREFIX + " not set. Cannot generate table name.");
value = conf.get(RdfCloudTripleStoreConfiguration.CONF_TBL_PREFIX) + altSuffix;
}
return value;
}
- public static String getFreeTextDocTablename(Configuration conf) {
+ public static String getFreeTextDocTablename(final Configuration conf) {
return getIndexTableName(conf, FREE_TEXT_DOC_TABLENAME, "freetext");
}
- public static String getFreeTextTermTablename(Configuration conf) {
+ public static String getFreeTextTermTablename(final Configuration conf) {
return getIndexTableName(conf, FREE_TEXT_TERM_TABLENAME, "freetext_term");
}
- public static int getFreeTextTermLimit(Configuration conf) {
+ public static int getFreeTextTermLimit(final Configuration conf) {
return conf.getInt(FREE_TEXT_QUERY_TERM_LIMIT, 100);
}
- public static String getGeoTablename(Configuration conf) {
+ public static String getGeoTablename(final Configuration conf) {
return getIndexTableName(conf, GEO_TABLENAME, "geo");
}
-
- public static String getTemporalTableName(Configuration conf) {
+
+ public static String getTemporalTableName(final Configuration conf) {
return getIndexTableName(conf, TEMPORAL_TABLENAME, "temporal");
}
-
-
- public static String getEntityTableName(Configuration conf) {
+
+
+ public static String getEntityTableName(final Configuration conf) {
return getIndexTableName(conf, ENTITY_TABLENAME, "entity");
}
- public static Set<URI> getFreeTextPredicates(Configuration conf) {
+ public static Set<URI> getFreeTextPredicates(final Configuration conf) {
return getPredicates(conf, FREETEXT_PREDICATES_LIST);
}
- public static Set<URI> getGeoPredicates(Configuration conf) {
+ public static Set<URI> getGeoPredicates(final Configuration conf) {
return getPredicates(conf, GEO_PREDICATES_LIST);
}
/**
- * Used for indexing statements about date & time instances and intervals.
+ * Used for indexing statements about date & time instances and intervals.
* @param conf
* @return Set of predicate URI's whose objects should be date time literals.
*/
- public static Set<URI> getTemporalPredicates(Configuration conf) {
+ public static Set<URI> getTemporalPredicates(final Configuration conf) {
return getPredicates(conf, TEMPORAL_PREDICATES_LIST);
}
- private static Set<URI> getPredicates(Configuration conf, String confName) {
- String[] validPredicateStrings = conf.getStrings(confName, new String[] {});
- Set<URI> predicates = new HashSet<URI>();
- for (String prediateString : validPredicateStrings) {
+ private static Set<URI> getPredicates(final Configuration conf, final String confName) {
+ final String[] validPredicateStrings = conf.getStrings(confName, new String[] {});
+ final Set<URI> predicates = new HashSet<URI>();
+ for (final String prediateString : validPredicateStrings) {
predicates.add(new URIImpl(prediateString));
}
return predicates;
}
- public static Tokenizer getFreeTextTokenizer(Configuration conf) {
- Class<? extends Tokenizer> c = conf.getClass(TOKENIZER_CLASS, LuceneTokenizer.class, Tokenizer.class);
+ public static Tokenizer getFreeTextTokenizer(final Configuration conf) {
+ final Class<? extends Tokenizer> c = conf.getClass(TOKENIZER_CLASS, LuceneTokenizer.class, Tokenizer.class);
return ReflectionUtils.newInstance(c, conf);
}
- public static BatchWriter createDefaultBatchWriter(String tablename, Configuration conf) throws TableNotFoundException,
+ public static BatchWriter createDefaultBatchWriter(final String tablename, final Configuration conf) throws TableNotFoundException,
AccumuloException, AccumuloSecurityException {
- Long DEFAULT_MAX_MEMORY = getWriterMaxMemory(conf);
- Long DEFAULT_MAX_LATENCY = getWriterMaxLatency(conf);
- Integer DEFAULT_MAX_WRITE_THREADS = getWriterMaxWriteThreads(conf);
- Connector connector = ConfigUtils.getConnector(conf);
+ final Long DEFAULT_MAX_MEMORY = getWriterMaxMemory(conf);
+ final Long DEFAULT_MAX_LATENCY = getWriterMaxLatency(conf);
+ final Integer DEFAULT_MAX_WRITE_THREADS = getWriterMaxWriteThreads(conf);
+ final Connector connector = ConfigUtils.getConnector(conf);
return connector.createBatchWriter(tablename, DEFAULT_MAX_MEMORY, DEFAULT_MAX_LATENCY, DEFAULT_MAX_WRITE_THREADS);
}
- public static MultiTableBatchWriter createMultitableBatchWriter(Configuration conf) throws AccumuloException, AccumuloSecurityException {
- Long DEFAULT_MAX_MEMORY = getWriterMaxMemory(conf);
- Long DEFAULT_MAX_LATENCY = getWriterMaxLatency(conf);
- Integer DEFAULT_MAX_WRITE_THREADS = getWriterMaxWriteThreads(conf);
- Connector connector = ConfigUtils.getConnector(conf);
+ public static MultiTableBatchWriter createMultitableBatchWriter(final Configuration conf) throws AccumuloException, AccumuloSecurityException {
+ final Long DEFAULT_MAX_MEMORY = getWriterMaxMemory(conf);
+ final Long DEFAULT_MAX_LATENCY = getWriterMaxLatency(conf);
+ final Integer DEFAULT_MAX_WRITE_THREADS = getWriterMaxWriteThreads(conf);
+ final Connector connector = ConfigUtils.getConnector(conf);
return connector.createMultiTableBatchWriter(DEFAULT_MAX_MEMORY, DEFAULT_MAX_LATENCY, DEFAULT_MAX_WRITE_THREADS);
}
- public static Scanner createScanner(String tablename, Configuration conf) throws AccumuloException, AccumuloSecurityException,
+ public static Scanner createScanner(final String tablename, final Configuration conf) throws AccumuloException, AccumuloSecurityException,
TableNotFoundException {
- Connector connector = ConfigUtils.getConnector(conf);
- Authorizations auths = ConfigUtils.getAuthorizations(conf);
+ final Connector connector = ConfigUtils.getConnector(conf);
+ final Authorizations auths = ConfigUtils.getAuthorizations(conf);
return connector.createScanner(tablename, auths);
}
- public static BatchScanner createBatchScanner(String tablename, Configuration conf) throws AccumuloException, AccumuloSecurityException,
+ public static BatchScanner createBatchScanner(final String tablename, final Configuration conf) throws AccumuloException, AccumuloSecurityException,
TableNotFoundException {
- Connector connector = ConfigUtils.getConnector(conf);
- Authorizations auths = ConfigUtils.getAuthorizations(conf);
+ final Connector connector = ConfigUtils.getConnector(conf);
+ final Authorizations auths = ConfigUtils.getAuthorizations(conf);
Integer numThreads = null;
if (conf instanceof RdfCloudTripleStoreConfiguration)
numThreads = ((RdfCloudTripleStoreConfiguration) conf).getNumThreads();
@@ -260,123 +260,123 @@ public class ConfigUtils {
return connector.createBatchScanner(tablename, auths, numThreads);
}
- public static int getWriterMaxWriteThreads(Configuration conf) {
+ public static int getWriterMaxWriteThreads(final Configuration conf) {
return conf.getInt(CLOUDBASE_WRITER_MAX_WRITE_THREADS, WRITER_MAX_WRITE_THREADS);
}
- public static long getWriterMaxLatency(Configuration conf) {
+ public static long getWriterMaxLatency(final Configuration conf) {
return conf.getLong(CLOUDBASE_WRITER_MAX_LATENCY, WRITER_MAX_LATNECY);
}
- public static long getWriterMaxMemory(Configuration conf) {
+ public static long getWriterMaxMemory(final Configuration conf) {
return conf.getLong(CLOUDBASE_WRITER_MAX_MEMORY, WRITER_MAX_MEMORY);
}
- public static String getUsername(JobContext job) {
+ public static String getUsername(final JobContext job) {
return getUsername(job.getConfiguration());
}
- public static String getUsername(Configuration conf) {
+ public static String getUsername(final Configuration conf) {
return conf.get(CLOUDBASE_USER);
}
- public static Authorizations getAuthorizations(JobContext job) {
+ public static Authorizations getAuthorizations(final JobContext job) {
return getAuthorizations(job.getConfiguration());
}
- public static Authorizations getAuthorizations(Configuration conf) {
- String authString = conf.get(CLOUDBASE_AUTHS, "");
+ public static Authorizations getAuthorizations(final Configuration conf) {
+ final String authString = conf.get(CLOUDBASE_AUTHS, "");
if (authString.isEmpty()) {
return new Authorizations();
}
return new Authorizations(authString.split(","));
}
- public static Instance getInstance(JobContext job) {
+ public static Instance getInstance(final JobContext job) {
return getInstance(job.getConfiguration());
}
- public static Instance getInstance(Configuration conf) {
+ public static Instance getInstance(final Configuration conf) {
if (useMockInstance(conf)) {
return new MockInstance(conf.get(CLOUDBASE_INSTANCE));
}
return new ZooKeeperInstance(conf.get(CLOUDBASE_INSTANCE), conf.get(CLOUDBASE_ZOOKEEPERS));
}
- public static String getPassword(JobContext job) {
+ public static String getPassword(final JobContext job) {
return getPassword(job.getConfiguration());
}
- public static String getPassword(Configuration conf) {
+ public static String getPassword(final Configuration conf) {
return conf.get(CLOUDBASE_PASSWORD, "");
}
- public static Connector getConnector(JobContext job) throws AccumuloException, AccumuloSecurityException {
+ public static Connector getConnector(final JobContext job) throws AccumuloException, AccumuloSecurityException {
return getConnector(job.getConfiguration());
}
- public static Connector getConnector(Configuration conf) throws AccumuloException, AccumuloSecurityException {
- Instance instance = ConfigUtils.getInstance(conf);
+ public static Connector getConnector(final Configuration conf) throws AccumuloException, AccumuloSecurityException {
+ final Instance instance = ConfigUtils.getInstance(conf);
return instance.getConnector(getUsername(conf), getPassword(conf));
}
- public static boolean useMockInstance(Configuration conf) {
+ public static boolean useMockInstance(final Configuration conf) {
return conf.getBoolean(USE_MOCK_INSTANCE, false);
}
- private static int getNumPartitions(Configuration conf) {
+ private static int getNumPartitions(final Configuration conf) {
return conf.getInt(NUM_PARTITIONS, 25);
}
- public static int getFreeTextDocNumPartitions(Configuration conf) {
+ public static int getFreeTextDocNumPartitions(final Configuration conf) {
return conf.getInt(FREETEXT_DOC_NUM_PARTITIONS, getNumPartitions(conf));
}
- public static int getFreeTextTermNumPartitions(Configuration conf) {
+ public static int getFreeTextTermNumPartitions(final Configuration conf) {
return conf.getInt(FREETEXT_TERM_NUM_PARTITIONS, getNumPartitions(conf));
}
- public static int getGeoNumPartitions(Configuration conf) {
+ public static int getGeoNumPartitions(final Configuration conf) {
return conf.getInt(GEO_NUM_PARTITIONS, getNumPartitions(conf));
}
-
- public static boolean getUseGeo(Configuration conf) {
+
+ public static boolean getUseGeo(final Configuration conf) {
return conf.getBoolean(USE_GEO, false);
}
-
- public static boolean getUseFreeText(Configuration conf) {
+
+ public static boolean getUseFreeText(final Configuration conf) {
return conf.getBoolean(USE_FREETEXT, false);
}
-
- public static boolean getUseTemporal(Configuration conf) {
+
+ public static boolean getUseTemporal(final Configuration conf) {
return conf.getBoolean(USE_TEMPORAL, false);
}
-
- public static boolean getUseEntity(Configuration conf) {
+
+ public static boolean getUseEntity(final Configuration conf) {
return conf.getBoolean(USE_ENTITY, false);
}
-
- public static boolean getUsePCJ(Configuration conf) {
+
+ public static boolean getUsePCJ(final Configuration conf) {
return conf.getBoolean(USE_PCJ, false);
}
-
- public static boolean getUseOptimalPCJ(Configuration conf) {
+
+ public static boolean getUseOptimalPCJ(final Configuration conf) {
return conf.getBoolean(USE_OPTIMAL_PCJ, false);
}
-
- public static boolean getUseMongo(Configuration conf) {
+
+ public static boolean getUseMongo(final Configuration conf) {
return conf.getBoolean(USE_MONGO, false);
}
-
-
- public static void setIndexers(RdfCloudTripleStoreConfiguration conf) {
-
- List<String> indexList = Lists.newArrayList();
- List<String> optimizers = Lists.newArrayList();
-
- boolean useFilterIndex = false;
-
+
+
+ public static void setIndexers(final RdfCloudTripleStoreConfiguration conf) {
+
+ final List<String> indexList = Lists.newArrayList();
+ final List<String> optimizers = Lists.newArrayList();
+
+ boolean useFilterIndex = false;
+
if (ConfigUtils.getUseMongo(conf)) {
if (getUseGeo(conf)) {
indexList.add(MongoGeoIndexer.class.getName());
@@ -408,7 +408,7 @@ public class ConfigUtils {
}
}
-
+
if (useFilterIndex) {
optimizers.add(FilterFunctionOptimizer.class.getName());
}
@@ -418,12 +418,8 @@ public class ConfigUtils {
optimizers.add(EntityOptimizer.class.getName());
}
-
+
conf.setStrings(AccumuloRdfConfiguration.CONF_ADDITIONAL_INDEXERS, indexList.toArray(new String[]{}));
conf.setStrings(AccumuloRdfConfiguration.CONF_OPTIMIZERS, optimizers.toArray(new String[]{}));
-
}
-
-
-
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/14073a23/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/VisibilityBindingSet.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/VisibilityBindingSet.java b/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/VisibilityBindingSet.java
deleted file mode 100644
index b9e2351..0000000
--- a/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/VisibilityBindingSet.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package mvm.rya.indexing.accumulo;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import javax.annotation.ParametersAreNonnullByDefault;
-
-import org.openrdf.query.BindingSet;
-
-import mvm.rya.indexing.external.BindingSetDecorator;
-
-/**
- * Decorates a {@link BindingSet} with a collection of visibilities.
- */
-@ParametersAreNonnullByDefault
-public class VisibilityBindingSet extends BindingSetDecorator {
- private static final long serialVersionUID = 1L;
- private final String visibility;
- private volatile int hashCode;
-
- /**
- * @param set - Decorates the {@link BindingSet} with no visibilities.
- */
- public VisibilityBindingSet(final BindingSet set) {
- this(set, "");
- }
-
- /**
- * Creates a new {@link VisibilityBindingSet}
- * @param set - The {@link BindingSet} to decorate
- * @param visibility - The visibilities on the {@link BindingSet} (not null)
- */
- public VisibilityBindingSet(final BindingSet set, final String visibility) {
- super(set);
- this.visibility = checkNotNull(visibility);
- }
-
- /**
- * @return - The Visibilities on the {@link BindingSet}
- */
- public String getVisibility() {
- return visibility;
- }
-
- @Override
- public boolean equals(final Object o) {
- if (this == o) {
- return true;
- } else if(o instanceof VisibilityBindingSet) {
- final VisibilityBindingSet other = (VisibilityBindingSet) o;
- return set.equals(other) && visibility.equals(other.getVisibility());
- }
- return false;
- }
-
- @Override
- public int hashCode() {
- int result = hashCode;
- if(result == 0) {
- result = 31 * result + visibility.hashCode();
- result = 31 * result + super.hashCode();
- hashCode = result;
- }
- return result;
- }
-
- @Override
- public String toString() {
- final StringBuilder sb = new StringBuilder(super.toString());
- sb.append("\n Visibility: " + getVisibility() + "\n");
- return sb.toString();
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/14073a23/extras/indexing/src/main/java/mvm/rya/indexing/external/BindingSetDecorator.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/external/BindingSetDecorator.java b/extras/indexing/src/main/java/mvm/rya/indexing/external/BindingSetDecorator.java
deleted file mode 100644
index b4909bd..0000000
--- a/extras/indexing/src/main/java/mvm/rya/indexing/external/BindingSetDecorator.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package mvm.rya.indexing.external;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import java.util.Iterator;
-import java.util.Set;
-
-import org.openrdf.model.Value;
-import org.openrdf.query.Binding;
-import org.openrdf.query.BindingSet;
-
-/**
- * Abstracts out the decoration of a {@link BindingSet}.
- */
-public abstract class BindingSetDecorator implements BindingSet {
- private static final long serialVersionUID = 1L;
- protected final BindingSet set;
- private volatile int hashCode;
-
- /**
- * Constructs a new {@link BindingSetDecorator}, decorating the provided
- * {@link BindingSet}.
- * @param set - The {@link BindingSet} to be decorated. (not null)
- */
- public BindingSetDecorator(final BindingSet set) {
- this.set = checkNotNull(set);
- }
-
- @Override
- public Iterator<Binding> iterator() {
- return set.iterator();
- }
-
- @Override
- public Set<String> getBindingNames() {
- return set.getBindingNames();
- }
-
- @Override
- public Binding getBinding(final String bindingName) {
- return set.getBinding(bindingName);
- }
-
- @Override
- public boolean hasBinding(final String bindingName) {
- return set.hasBinding(bindingName);
- }
-
- @Override
- public Value getValue(final String bindingName) {
- return set.getValue(bindingName);
- }
-
- @Override
- public int size() {
- return set.size();
- }
-
- @Override
- public boolean equals(final Object o) {
- if(!(o instanceof BindingSetDecorator)) {
- return false;
- }
- final BindingSetDecorator other = (BindingSetDecorator) o;
- return set.equals(other.set);
- }
-
- @Override
- public int hashCode() {
- int result = hashCode;
- if(result == 0) {
- result = 31 * result + set.hashCode();
- hashCode = result;
- }
- return result;
- }
-
- @Override
- public String toString() {
- final StringBuilder sb = new StringBuilder();
- sb.append(" names: ");
- for (final String name : getBindingNames()) {
- sb.append("\n [name]: " + name + " --- [value]: " + getBinding(name).getValue().toString());
- }
- return sb.toString();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/14073a23/extras/indexing/src/main/java/mvm/rya/indexing/external/PrecompJoinOptimizer.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/external/PrecompJoinOptimizer.java b/extras/indexing/src/main/java/mvm/rya/indexing/external/PrecompJoinOptimizer.java
index f8c6c77..75497da 100644
--- a/extras/indexing/src/main/java/mvm/rya/indexing/external/PrecompJoinOptimizer.java
+++ b/extras/indexing/src/main/java/mvm/rya/indexing/external/PrecompJoinOptimizer.java
@@ -27,27 +27,14 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
-import mvm.rya.api.RdfCloudTripleStoreConfiguration;
-import mvm.rya.indexing.IndexPlanValidator.IndexPlanValidator;
-import mvm.rya.indexing.IndexPlanValidator.IndexedExecutionPlanGenerator;
-import mvm.rya.indexing.IndexPlanValidator.ThreshholdPlanSelector;
-import mvm.rya.indexing.IndexPlanValidator.TupleReArranger;
-import mvm.rya.indexing.IndexPlanValidator.ValidIndexCombinationGenerator;
-import mvm.rya.indexing.accumulo.ConfigUtils;
-import mvm.rya.indexing.external.QueryVariableNormalizer.VarCollector;
-import mvm.rya.indexing.external.tupleSet.AccumuloIndexSet;
-import mvm.rya.indexing.external.tupleSet.ExternalTupleSet;
-import mvm.rya.indexing.external.tupleSet.PcjTables;
-import mvm.rya.indexing.external.tupleSet.PcjTables.PcjException;
-import mvm.rya.rdftriplestore.inference.DoNotExpandSP;
-import mvm.rya.rdftriplestore.utils.FixedStatementPattern;
-
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
+import org.apache.rya.indexing.pcj.storage.PcjException;
+import org.apache.rya.indexing.pcj.storage.accumulo.PcjTables;
import org.openrdf.query.BindingSet;
import org.openrdf.query.Dataset;
import org.openrdf.query.MalformedQueryException;
@@ -82,6 +69,19 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
+import mvm.rya.api.RdfCloudTripleStoreConfiguration;
+import mvm.rya.indexing.IndexPlanValidator.IndexPlanValidator;
+import mvm.rya.indexing.IndexPlanValidator.IndexedExecutionPlanGenerator;
+import mvm.rya.indexing.IndexPlanValidator.ThreshholdPlanSelector;
+import mvm.rya.indexing.IndexPlanValidator.TupleReArranger;
+import mvm.rya.indexing.IndexPlanValidator.ValidIndexCombinationGenerator;
+import mvm.rya.indexing.accumulo.ConfigUtils;
+import mvm.rya.indexing.external.QueryVariableNormalizer.VarCollector;
+import mvm.rya.indexing.external.tupleSet.AccumuloIndexSet;
+import mvm.rya.indexing.external.tupleSet.ExternalTupleSet;
+import mvm.rya.rdftriplestore.inference.DoNotExpandSP;
+import mvm.rya.rdftriplestore.utils.FixedStatementPattern;
+
/**
* {@link QueryOptimizer} which matches TupleExpressions associated with
* pre-computed queries to sub-queries of a given query. Each matched sub-query
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/14073a23/extras/indexing/src/main/java/mvm/rya/indexing/external/PrecomputedJoinIndexer.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/external/PrecomputedJoinIndexer.java b/extras/indexing/src/main/java/mvm/rya/indexing/external/PrecomputedJoinIndexer.java
new file mode 100644
index 0000000..cce0a81
--- /dev/null
+++ b/extras/indexing/src/main/java/mvm/rya/indexing/external/PrecomputedJoinIndexer.java
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package mvm.rya.indexing.external;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+
+import javax.annotation.ParametersAreNonnullByDefault;
+
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.MultiTableBatchWriter;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.log4j.Logger;
+import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
+import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.PCJStorageException;
+import org.apache.rya.indexing.pcj.update.PrecomputedJoinUpdater;
+import org.apache.rya.indexing.pcj.update.PrecomputedJoinUpdater.PcjUpdateException;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Supplier;
+
+import mvm.rya.accumulo.experimental.AccumuloIndexer;
+import mvm.rya.api.RdfCloudTripleStoreConfiguration;
+import mvm.rya.api.domain.RyaStatement;
+import mvm.rya.api.domain.RyaURI;
+import mvm.rya.api.persist.RyaDAO;
+import mvm.rya.indexing.external.accumulo.AccumuloPcjStorage;
+import mvm.rya.indexing.external.accumulo.AccumuloPcjStorageSupplier;
+import mvm.rya.indexing.external.fluo.FluoPcjUpdaterSupplier;
+
+/**
+ * Updates the state of the Precomputed Join indices that are used by Rya.
+ */
+@ParametersAreNonnullByDefault
+public class PrecomputedJoinIndexer implements AccumuloIndexer {
+ private static final Logger log = Logger.getLogger(PrecomputedJoinIndexer.class);
+
+ /**
+ * This configuration object must be set before {@link #init()} is invoked.
+ * It is set by {@link #setConf(Configuration)}.
+ */
+ private Optional<Configuration> conf = Optional.absent();
+
+ /**
+ * The Accumulo Connector that must be used when accessing an Accumulo storage.
+ * This value is provided by {@link #setConnector(Connector)}.
+ */
+ private Optional<Connector> accumuloConn = Optional.absent();
+
+ /**
+ * Provides access to the {@link Configuration} that was provided to this class
+ * using {@link #setConf(Configuration)}.
+ */
+ private final Supplier<Configuration> configSupplier = new Supplier<Configuration>() {
+ @Override
+ public Configuration get() {
+ return getConf();
+ }
+ };
+
+ /**
+ * Provides access to the Accumulo {@link Connector} that was provided to
+ * this class using {@link #setConnector(Connector)}.
+ */
+ private final Supplier<Connector> accumuloSupplier = new Supplier<Connector>() {
+ @Override
+ public Connector get() {
+ return accumuloConn.get();
+ }
+ };
+
+ /**
+ * Creates and grants access to the {@link PrecomputedJoinStorage} that will be used
+ * to interact with the PCJ results that are stored and used by Rya.
+ */
+ private final PrecomputedJoinStorageSupplier pcjStorageSupplier =
+ new PrecomputedJoinStorageSupplier(
+ configSupplier,
+ new AccumuloPcjStorageSupplier(configSupplier, accumuloSupplier));
+
+ /**
+ * Creates and grants access to the {@link PrecomputedJoinUpdater} that will
+ * be used to update the state stored within the PCJ tables that are stored
+ * in Accumulo.
+ */
+ private final PrecomputedJoinUpdaterSupplier pcjUpdaterSupplier =
+ new PrecomputedJoinUpdaterSupplier(
+ configSupplier,
+ new FluoPcjUpdaterSupplier(configSupplier));
+
+ @Override
+ public void setConf(final Configuration conf) {
+ this.conf = Optional.fromNullable(conf);
+ }
+
+ @Override
+ public Configuration getConf() {
+ return this.conf.get();
+ }
+
+ /**
+ * Set the connector that will be used by {@link AccumuloPcjStorage} if the
+ * application is configured to store the PCJs within Accumulo.
+ */
+ @Override
+ public void setConnector(final Connector connector) {
+ checkNotNull(connector);
+ accumuloConn = Optional.of( connector );
+ }
+
+ /**
+ * This is invoked when the host {@link RyaDAO#init()} method is invoked.
+ */
+ @Override
+ public void init() {
+ pcjStorageSupplier.get();
+ pcjUpdaterSupplier.get();
+ }
+
+ @Override
+ public void storeStatement(final RyaStatement statement) throws IOException {
+ checkNotNull(statement);
+ storeStatements( Collections.singleton(statement) );
+ }
+
+ @Override
+ public void storeStatements(final Collection<RyaStatement> statements) throws IOException {
+ checkNotNull(statements);
+ try {
+ pcjUpdaterSupplier.get().addStatements(statements);
+ } catch (final PcjUpdateException e) {
+ throw new IOException("Could not update the PCJs by adding the provided statements.", e);
+ }
+ }
+
+ @Override
+ public void deleteStatement(final RyaStatement statement) throws IOException {
+ checkNotNull(statement);
+ try {
+ pcjUpdaterSupplier.get().deleteStatements( Collections.singleton(statement) );
+ } catch (final PcjUpdateException e) {
+ throw new IOException("Could not update the PCJs by removing the provided statement.", e);
+ }
+ }
+
+ @Override
+ public void flush() throws IOException {
+ try {
+ pcjUpdaterSupplier.get().flush();
+ } catch (final PcjUpdateException e) {
+ throw new IOException("Could not flush the PCJ Updater.", e);
+ }
+ }
+
+ @Override
+ public void close() {
+ try {
+ pcjStorageSupplier.get().close();
+ } catch (final PCJStorageException e) {
+ log.error("Could not close the PCJ Storage instance.", e);
+ }
+
+ try {
+ pcjUpdaterSupplier.get().close();
+ } catch (final PcjUpdateException e) {
+ log.error("Could not close the PCJ Updater instance.", e);
+ }
+ }
+
+ /**
+ * This is invoked when the host {@link RyaDAO#destroy()} method is invoked.
+ */
+ @Override
+ public void destroy() {
+ close();
+ }
+
+ /**
+ * Deletes all data from the PCJ indices that are managed by a {@link PrecomputedJoinStorage}.
+ */
+ @Override
+ public void purge(final RdfCloudTripleStoreConfiguration configuration) {
+ final PrecomputedJoinStorage storage = pcjStorageSupplier.get();
+
+ try {
+ for(final String pcjId : storage.listPcjs()) {
+ try {
+ storage.purge(pcjId);
+ } catch(final PCJStorageException e) {
+ log.error("Could not purge the PCJ index with id: " + pcjId, e);
+ }
+ }
+ } catch (final PCJStorageException e) {
+ log.error("Could not purge the PCJ indicies because they could not be listed.", e);
+ }
+ }
+
+ /**
+ * Deletes all of the PCJ indices that are managed by {@link PrecomputedJoinStorage}.
+ */
+ @Override
+ public void dropAndDestroy() {
+ final PrecomputedJoinStorage storage = pcjStorageSupplier.get();
+
+ try {
+ for(final String pcjId : storage.listPcjs()) {
+ try {
+ storage.dropPcj(pcjId);
+ } catch(final PCJStorageException e) {
+ log.error("Could not delete the PCJ index with id: " + pcjId, e);
+ }
+ }
+ } catch(final PCJStorageException e) {
+ log.error("Could not delete the PCJ indicies because they could not be listed.", e);
+ }
+ }
+
+ @Override
+ public void setMultiTableBatchWriter(final MultiTableBatchWriter writer) throws IOException {
+ // We do not need to use the writer that also writes to the core RYA tables.
+ }
+
+ @Override
+ public void dropGraph(final RyaURI... graphs) {
+ log.warn("PCJ indices do not store Graph metadata, so graph results can not be dropped.");
+ }
+
+ @Override
+ public String getTableName() {
+ // This method makes assumptions about how PCJs are stored. It's only
+ // used by AccumuloRyaDAO to purge data, so it should be replaced with
+ // a purge() method.
+ log.warn("PCJ indicies are not stored within a single table, so this method can not be implemented.");
+ return null;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/14073a23/extras/indexing/src/main/java/mvm/rya/indexing/external/PrecomputedJoinIndexerConfig.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/external/PrecomputedJoinIndexerConfig.java b/extras/indexing/src/main/java/mvm/rya/indexing/external/PrecomputedJoinIndexerConfig.java
new file mode 100644
index 0000000..c56f574
--- /dev/null
+++ b/extras/indexing/src/main/java/mvm/rya/indexing/external/PrecomputedJoinIndexerConfig.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package mvm.rya.indexing.external;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import javax.annotation.ParametersAreNonnullByDefault;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
+import org.apache.rya.indexing.pcj.update.PrecomputedJoinUpdater;
+
+import com.google.common.base.Optional;
+
+import mvm.rya.api.persist.index.RyaSecondaryIndexer;
+
+/**
+ * Inspects the {@link Configuration} object that is provided to all instances
+ * of {@link RyaSecondaryIndexer} to provide {@link PrecomputedJoinIndexer}
+ * specific values.
+ */
+@ParametersAreNonnullByDefault
+public class PrecomputedJoinIndexerConfig {
+
+ /**
+ * Enumerates the different methodologies implemented to store the PCJ indices.
+ */
+ public static enum PrecomputedJoinStorageType {
+ /**
+ * Stores each PCJ within an Accumulo table.
+ */
+ ACCUMULO;
+ }
+
+ /**
+ * Enumerates the different methodologies implemented to update the PCJ indices.
+ */
+ public static enum PrecomputedJoinUpdaterType {
+ /**
+ * Incrementally updates the PCJs is pseudo-realtime new adds/deletes are encountered.
+ */
+ FLUO;
+ }
+
+ // Indicates which implementation of PrecomputedJoinStorage to use.
+ public static final String PCJ_STORAGE_TYPE = "rya.indexing.pcj.storageType";
+
+ // Indicates which implementation of PrecomputedJoinUpdater to use.
+ public static final String PCJ_UPDATER_TYPE = "rya.indexing.pcj.updaterType";
+
+ // The configuration object that is provided to Secondary Indexing implementations.
+ private final Configuration config;
+
+ /**
+ * Constructs an instance of {@link PrecomputedJoinIndexerConfig}.
+ *
+ * @param config - The {@link Configuration} object that is provided to
+ * all instance of {@link RyaSecondaryIndexer}. It will be inspected
+ * for {@link PrecomputedJoinIndexer} specific values. (not null)
+ */
+ public PrecomputedJoinIndexerConfig(final Configuration config) {
+ this.config = checkNotNull(config);
+ }
+
+ /**
+ * @return The type of {@link PrecomputedJoinStorage} to use.
+ */
+ public Optional<PrecomputedJoinStorageType> getPcjStorageType() {
+ final String storageTypeString = config.get(PCJ_STORAGE_TYPE);
+ if(storageTypeString == null) {
+ return Optional.absent();
+ }
+
+ final PrecomputedJoinStorageType storageType = PrecomputedJoinStorageType.valueOf(storageTypeString);
+ return Optional.fromNullable(storageType);
+ }
+
+ /**
+ * @return The type of {@link PrecomputedJoinUpdater} to use.
+ */
+ public Optional<PrecomputedJoinUpdaterType> getPcjUpdaterType() {
+ final String updaterTypeString = config.get(PCJ_UPDATER_TYPE);
+ if(updaterTypeString == null) {
+ return Optional.absent();
+ }
+
+ final PrecomputedJoinUpdaterType updaterType = PrecomputedJoinUpdaterType.valueOf(updaterTypeString);
+ return Optional.fromNullable(updaterType);
+ }
+
+ /**
+ * @return The configuration object that has been wrapped.
+ */
+ public Configuration getConfig() {
+ return config;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/14073a23/extras/indexing/src/main/java/mvm/rya/indexing/external/PrecomputedJoinStorageSupplier.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/external/PrecomputedJoinStorageSupplier.java b/extras/indexing/src/main/java/mvm/rya/indexing/external/PrecomputedJoinStorageSupplier.java
new file mode 100644
index 0000000..bf10c84
--- /dev/null
+++ b/extras/indexing/src/main/java/mvm/rya/indexing/external/PrecomputedJoinStorageSupplier.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package mvm.rya.indexing.external;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Supplier;
+
+import mvm.rya.indexing.external.PrecomputedJoinIndexerConfig.PrecomputedJoinStorageType;
+import mvm.rya.indexing.external.accumulo.AccumuloPcjStorageSupplier;
+
+/**
+ * Creates an instance of {@link PrecomputedJoinStorage} based on the application's configuration.
+ */
+public class PrecomputedJoinStorageSupplier implements Supplier<PrecomputedJoinStorage> {
+
+ private final Supplier<Configuration> configSupplier;
+ private final AccumuloPcjStorageSupplier accumuloSupplier;
+
+ /**
+ * Constructs an instance of {@link PrecomputedJoinStorageSupplier}.
+ *
+ * @param configSupplier - Provides access to the configuration of the
+ * application used to initialize the storage. (not null)
+ * @param accumuloSupplier - Used to create an Accumulo instance of the
+ * storage if that is the configured type. (not null)
+ */
+ public PrecomputedJoinStorageSupplier(
+ final Supplier<Configuration> configSupplier,
+ final AccumuloPcjStorageSupplier accumuloSupplier) {
+ this.configSupplier = checkNotNull(configSupplier);
+ this.accumuloSupplier = checkNotNull(accumuloSupplier);
+ }
+
+ @Override
+ public PrecomputedJoinStorage get() {
+ // Ensure a configuration has been set.
+ final Configuration config = configSupplier.get();
+ checkNotNull(config, "Could not build the PrecomputedJoinStorage until the PrecomputedJoinIndexer has been configured.");
+
+ final PrecomputedJoinIndexerConfig indexerConfig = new PrecomputedJoinIndexerConfig(config);
+
+ // Ensure the storage type has been set.
+ final Optional<PrecomputedJoinStorageType> storageType = indexerConfig.getPcjStorageType();
+ checkArgument(storageType.isPresent(), "The '" + PrecomputedJoinIndexerConfig.PCJ_STORAGE_TYPE +
+ "' property must have one of the following values: " + PrecomputedJoinStorageType.values());
+
+ // Create and return the configured storage.
+ switch(storageType.get()) {
+ case ACCUMULO:
+ return accumuloSupplier.get();
+
+ default:
+ throw new IllegalArgumentException("Unsupported PrecomputedJoinStorageType: " + storageType.get());
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/14073a23/extras/indexing/src/main/java/mvm/rya/indexing/external/PrecomputedJoinUpdaterSupplier.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/external/PrecomputedJoinUpdaterSupplier.java b/extras/indexing/src/main/java/mvm/rya/indexing/external/PrecomputedJoinUpdaterSupplier.java
new file mode 100644
index 0000000..cabadb4
--- /dev/null
+++ b/extras/indexing/src/main/java/mvm/rya/indexing/external/PrecomputedJoinUpdaterSupplier.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package mvm.rya.indexing.external;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.rya.indexing.pcj.update.PrecomputedJoinUpdater;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Supplier;
+
+import mvm.rya.indexing.external.PrecomputedJoinIndexerConfig.PrecomputedJoinUpdaterType;
+import mvm.rya.indexing.external.fluo.FluoPcjUpdaterSupplier;
+
+/**
+ * Creates instance of {@link PrecomputedJoinUpdater} based on the application's configuration.
+ */
+public class PrecomputedJoinUpdaterSupplier implements Supplier<PrecomputedJoinUpdater> {
+
+ private final Supplier<Configuration> configSupplier;
+ private final FluoPcjUpdaterSupplier fluoSupplier;
+
+ /**
+ * Creates an instance of {@link PrecomputedJoinUpdaterSupplier}.
+ *
+ * @param configSupplier - Provides access to the configuration of the
+ * application used to initialize the updater. (not null)
+ * @param fluoSupplier - Used to create a Fluo instace of the updater
+ * if that is the configured type. (not null)
+ */
+ public PrecomputedJoinUpdaterSupplier(
+ final Supplier<Configuration> configSupplier,
+ final FluoPcjUpdaterSupplier fluoSupplier) {
+ this.configSupplier = checkNotNull(configSupplier);
+ this.fluoSupplier = checkNotNull(fluoSupplier);
+ }
+
+ @Override
+ public PrecomputedJoinUpdater get() {
+ // Ensure a configuration has been set.
+ final Configuration config = configSupplier.get();
+ checkNotNull(config, "Can not build the PrecomputedJoinUpdater until the PrecomputedJoinIndexer has been configured.");
+
+ final PrecomputedJoinIndexerConfig indexerConfig = new PrecomputedJoinIndexerConfig(config);
+
+ // Ensure an updater type has been set.
+ final Optional<PrecomputedJoinUpdaterType> updaterType = indexerConfig.getPcjUpdaterType();
+ checkArgument(updaterType.isPresent(), "The '" + PrecomputedJoinIndexerConfig.PCJ_UPDATER_TYPE +
+ "' property must have one of the following values: " + PrecomputedJoinUpdaterType.values());
+
+ // Create and return the configured updater.
+ switch(updaterType.get()) {
+ case FLUO:
+ return fluoSupplier.get();
+
+ default:
+ throw new IllegalArgumentException("Unsupported PrecomputedJoinUpdaterType: " + updaterType.get());
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/14073a23/extras/indexing/src/main/java/mvm/rya/indexing/external/accumulo/AccumuloPcjStorage.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/external/accumulo/AccumuloPcjStorage.java b/extras/indexing/src/main/java/mvm/rya/indexing/external/accumulo/AccumuloPcjStorage.java
new file mode 100644
index 0000000..6e79748
--- /dev/null
+++ b/extras/indexing/src/main/java/mvm/rya/indexing/external/accumulo/AccumuloPcjStorage.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package mvm.rya.indexing.external.accumulo;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+
+import javax.annotation.ParametersAreNonnullByDefault;
+
+import org.apache.accumulo.core.client.Connector;
+import org.apache.rya.indexing.pcj.storage.PcjMetadata;
+import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
+import org.apache.rya.indexing.pcj.storage.accumulo.PcjTableNameFactory;
+import org.apache.rya.indexing.pcj.storage.accumulo.PcjTables;
+import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
+import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
+
+/**
+ * An Accumulo backed implementation of {@link PrecomputedJoinStorage}.
+ */
+@ParametersAreNonnullByDefault
+public class AccumuloPcjStorage implements PrecomputedJoinStorage {
+
+ private final PcjTableNameFactory pcjIdFactory = new PcjTableNameFactory();
+ private final PcjTables pcjTables = new PcjTables();
+
+ private final Connector accumuloConn;
+ private final String ryaInstanceName;
+
+ /**
+ * Constructs an instance of {@link AccumuloPcjStorage}.
+ *
+ * @param accumuloConn - The connector that will be used to connect to Accumulo. (not null)
+ * @param ryaInstanceName - The name of the RYA instance that will be accessed. (not null)
+ */
+ public AccumuloPcjStorage(final Connector accumuloConn, final String ryaInstanceName) {
+ this.accumuloConn = checkNotNull(accumuloConn);
+ this.ryaInstanceName = checkNotNull(ryaInstanceName);
+ }
+
+ @Override
+ public List<String> listPcjs() throws PCJStorageException {
+ return pcjTables.listPcjTables(accumuloConn, ryaInstanceName);
+ }
+
+ @Override
+ public String createPcj(final String sparql, final Set<VariableOrder> varOrders) throws PCJStorageException {
+ final String pcjId = pcjIdFactory.makeTableName(ryaInstanceName);
+ pcjTables.createPcjTable(accumuloConn, pcjId, varOrders, sparql);
+ return pcjId;
+ }
+
+ @Override
+ public PcjMetadata getPcjMetadata(final String pcjId) throws PCJStorageException {
+ return pcjTables.getPcjMetadata(accumuloConn, pcjId);
+ }
+
+ @Override
+ public void addResults(final String pcjId, final Collection<VisibilityBindingSet> results) throws PCJStorageException {
+ pcjTables.addResults(accumuloConn, pcjId, results);
+ }
+
+ @Override
+ public void purge(final String pcjId) throws PCJStorageException {
+ pcjTables.purgePcjTable(accumuloConn, pcjId);
+ }
+
+ @Override
+ public void dropPcj(final String pcjId) throws PCJStorageException {
+ pcjTables.dropPcjTable(accumuloConn, pcjId);
+ }
+
+ @Override
+ public void close() throws PCJStorageException {
+ // Accumulo Connectors don't require closing.
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/14073a23/extras/indexing/src/main/java/mvm/rya/indexing/external/accumulo/AccumuloPcjStorageConfig.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/external/accumulo/AccumuloPcjStorageConfig.java b/extras/indexing/src/main/java/mvm/rya/indexing/external/accumulo/AccumuloPcjStorageConfig.java
new file mode 100644
index 0000000..73b50db
--- /dev/null
+++ b/extras/indexing/src/main/java/mvm/rya/indexing/external/accumulo/AccumuloPcjStorageConfig.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package mvm.rya.indexing.external.accumulo;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import org.apache.hadoop.conf.Configuration;
+
+import mvm.rya.api.RdfCloudTripleStoreConfiguration;
+
+/**
+ * Configuration values required to initialize a {@link AccumuloPcjStorage}.
+ */
+public class AccumuloPcjStorageConfig {
+
+ private final RdfCloudTripleStoreConfiguration config;
+
+ /**
+ * Constructs an instance of {@link AccumuloPcjStorageConfig}.
+ *
+ * @param config - The configuration values that will be interpreted. (not null)
+ */
+ public AccumuloPcjStorageConfig(final Configuration config) {
+ checkNotNull(config);
+
+ // Wrapping the config with this class so that we can use it's getTablePrefix() method.
+ this.config = new RdfCloudTripleStoreConfiguration(config) {
+ @Override
+ public RdfCloudTripleStoreConfiguration clone() {
+ return null;
+ }
+ };
+ }
+
+ /**
+ * @return The Rya Instance name the storage grants access to.
+ */
+ public String getRyaInstanceName() {
+ return config.getTablePrefix();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/14073a23/extras/indexing/src/main/java/mvm/rya/indexing/external/accumulo/AccumuloPcjStorageSupplier.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/external/accumulo/AccumuloPcjStorageSupplier.java b/extras/indexing/src/main/java/mvm/rya/indexing/external/accumulo/AccumuloPcjStorageSupplier.java
new file mode 100644
index 0000000..77b8f2e
--- /dev/null
+++ b/extras/indexing/src/main/java/mvm/rya/indexing/external/accumulo/AccumuloPcjStorageSupplier.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package mvm.rya.indexing.external.accumulo;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import org.apache.accumulo.core.client.Connector;
+import org.apache.hadoop.conf.Configuration;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Supplier;
+
+import mvm.rya.indexing.external.PrecomputedJoinIndexerConfig;
+import mvm.rya.indexing.external.PrecomputedJoinIndexerConfig.PrecomputedJoinStorageType;
+
+/**
+ * Creates instances of {@link AccumuloPcjStorage} using the values found in a {@link Configuration}.
+ */
+public class AccumuloPcjStorageSupplier implements Supplier<AccumuloPcjStorage> {
+
+ private final Supplier<Configuration> configSupplier;
+ private final Supplier<Connector> accumuloSupplier;
+
+ /**
+ * Constructs an instance of {@link AccumuloPcjStorageSupplier}.
+ *
+ * @param configSupplier - Configures the {@link AccumuloPcjStorage} that is
+ * supplied by this class. (not null)
+ * @param accumuloSupplier - Provides the {@link Connector} that is used by
+ * the {@link AccumuloPcjStorage} that is supplied by this class. (not null)
+ */
+ public AccumuloPcjStorageSupplier(
+ final Supplier<Configuration> configSupplier,
+ final Supplier<Connector> accumuloSupplier) {
+ this.configSupplier = checkNotNull(configSupplier);
+ this.accumuloSupplier = checkNotNull(accumuloSupplier);
+ }
+
+ @Override
+ public AccumuloPcjStorage get() {
+ // Ensure a configuration has been set.
+ final Configuration config = configSupplier.get();
+ checkNotNull(config, "Could not create a AccumuloPcjStorage because the application's configuration has not been provided yet.");
+
+ // Ensure the correct storage type has been set.
+ final PrecomputedJoinIndexerConfig indexerConfig = new PrecomputedJoinIndexerConfig(config);
+
+ final Optional<PrecomputedJoinStorageType> storageType = indexerConfig.getPcjStorageType();
+ checkArgument(storageType.isPresent() && (storageType.get() == PrecomputedJoinStorageType.ACCUMULO),
+ "This supplier requires the '" + PrecomputedJoinIndexerConfig.PCJ_STORAGE_TYPE +
+ "' value be set to '" + PrecomputedJoinStorageType.ACCUMULO + "'.");
+
+ // Ensure the Accumulo connector has been set.
+ final Connector accumuloConn = accumuloSupplier.get();
+ checkNotNull(accumuloConn, "The Accumulo Connector must be set before initializing the AccumuloPcjStorage.");
+
+ final String ryaInstanceName = new AccumuloPcjStorageConfig(config).getRyaInstanceName();
+ return new AccumuloPcjStorage(accumuloConn, ryaInstanceName);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/14073a23/extras/indexing/src/main/java/mvm/rya/indexing/external/fluo/FluoPcjUpdater.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/external/fluo/FluoPcjUpdater.java b/extras/indexing/src/main/java/mvm/rya/indexing/external/fluo/FluoPcjUpdater.java
new file mode 100644
index 0000000..901bd61
--- /dev/null
+++ b/extras/indexing/src/main/java/mvm/rya/indexing/external/fluo/FluoPcjUpdater.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package mvm.rya.indexing.external.fluo;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.Collection;
+
+import javax.annotation.ParametersAreNonnullByDefault;
+
+import org.apache.log4j.Logger;
+import org.apache.rya.indexing.pcj.fluo.api.InsertTriples;
+import org.apache.rya.indexing.pcj.update.PrecomputedJoinUpdater;
+
+import com.google.common.base.Optional;
+
+import io.fluo.api.client.FluoClient;
+import mvm.rya.api.domain.RyaStatement;
+
+/**
+ * Updates the PCJ indices by forwarding the statement additions/removals to
+ * a Fluo application.
+ */
+@ParametersAreNonnullByDefault
+public class FluoPcjUpdater implements PrecomputedJoinUpdater {
+ private static final Logger log = Logger.getLogger(FluoPcjUpdater.class);
+
+ // Used to only print the unsupported delete operation once.
+ private boolean deleteWarningPrinted = false;
+
+ private final FluoClient fluoClient;
+ private final InsertTriples insertTriples = new InsertTriples();
+ private final String statementVis;
+
+ /**
+ * Constructs an instance of {@link FluoPcjUpdater}.
+ *
+ * @param fluoClient - A connection to the Fluo table new statements will be
+ * inserted into and deleted from. (not null)
+ * @param statementVis - The visibility label that will be applied to all
+ * statements that are inserted via the Fluo PCJ updater. (not null)
+ */
+ public FluoPcjUpdater(final FluoClient fluoClient, final String statementVis) {
+ this.fluoClient = checkNotNull(fluoClient);
+ this.statementVis = checkNotNull(statementVis);
+ }
+
+ @Override
+ public void addStatements(final Collection<RyaStatement> statements) throws PcjUpdateException {
+ insertTriples.insert(fluoClient, statements, Optional.of(statementVis));
+ }
+
+ @Override
+ public void deleteStatements(final Collection<RyaStatement> statements) throws PcjUpdateException {
+ // The Fluo application does not support statement deletion.
+ if(!deleteWarningPrinted) {
+ log.warn("The Fluo PCJ updating application does not support Statement deletion, " +
+ "but you are trying to use that feature. This may result in your PCJ index " +
+ "no longer reflecting the Statemetns that are stored in the core Rya tables.");
+ deleteWarningPrinted = true;
+ }
+ }
+
+ @Override
+ public void flush() {
+ // The Fluo application does not do any batching, so this doesn't do anything.
+ }
+
+ @Override
+ public void close() {
+ fluoClient.close();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/14073a23/extras/indexing/src/main/java/mvm/rya/indexing/external/fluo/FluoPcjUpdaterConfig.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/external/fluo/FluoPcjUpdaterConfig.java b/extras/indexing/src/main/java/mvm/rya/indexing/external/fluo/FluoPcjUpdaterConfig.java
new file mode 100644
index 0000000..4378a4a
--- /dev/null
+++ b/extras/indexing/src/main/java/mvm/rya/indexing/external/fluo/FluoPcjUpdaterConfig.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package mvm.rya.indexing.external.fluo;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.google.common.base.Optional;
+
+import mvm.rya.indexing.accumulo.ConfigUtils;
+
+/**
+ * Configuration values required to initialize a {@link FluoPcjUpdater}.
+ */
+public final class FluoPcjUpdaterConfig {
+
+ // Defines which Fluo application is running for this instance of Rya.
+ public static final String FLUO_APP_NAME = "rya.indexing.pcj.fluo.fluoAppName";
+
+ // Values that define which Accumulo instance hosts the Fluo application's table.
+ public static final String ACCUMULO_ZOOKEEPERS = ConfigUtils.CLOUDBASE_ZOOKEEPERS;
+ public static final String ACCUMULO_INSTANCE = ConfigUtils.CLOUDBASE_INSTANCE;
+ public static final String ACCUMULO_USERNAME = ConfigUtils.CLOUDBASE_USER;
+ public static final String ACCUMULO_PASSWORD = ConfigUtils.CLOUDBASE_PASSWORD;
+
+ // Values that define the visibilities associated with statement that are inserted by the fluo updater.
+ public static final String STATEMENT_VISIBILITY = ConfigUtils.CLOUDBASE_AUTHS;
+
+ // The configuration object that is provided to Secondary Indexing implementations.
+ private final Configuration config;
+
+ /**
+ * Constructs an instance of {@link FluoPcjUpdaterConfig}.
+ *
+ * @param config - The configuration values that will be interpreted. (not null)
+ */
+ public FluoPcjUpdaterConfig(final Configuration config) {
+ this.config = checkNotNull(config);
+ }
+
+ /**
+ * @return The name of the Fluo Application this instance of RYA is
+ * using to incrementally update PCJs.
+ */
+ public Optional<String> getFluoAppName() {
+ return Optional.fromNullable(config.get(FLUO_APP_NAME));
+ }
+
+ /**
+ * This value is the {@link #getAccumuloInstance()} value appended with the
+ * "/fluo" namespace.
+ *
+ * @return The zookeepers that are used to manage Fluo state. ({@code null}
+ * if not configured)
+ */
+ public Optional<String> getFluoZookeepers() {
+ final Optional<String> accumuloZookeepers = getAccumuloZookeepers();
+ if(!accumuloZookeepers.isPresent()) {
+ return Optional.absent();
+ }
+ return Optional.of( accumuloZookeepers.get() + "/fluo" );
+ }
+
+ /**
+ * @return The zookeepers used to connect to the Accumulo instance that
+ * is storing the state of the Fluo Application.
+ */
+ public Optional<String> getAccumuloZookeepers() {
+ return Optional.fromNullable(config.get(ACCUMULO_ZOOKEEPERS));
+ }
+
+ /**
+ * @return The instance name of the Accumulo instance that is storing
+ * the state of the Fluo Application.
+ */
+ public Optional<String> getAccumuloInstance() {
+ return Optional.fromNullable(config.get(ACCUMULO_INSTANCE));
+ }
+
+ /**
+ * @return The username the indexer will authenticate when connecting
+ * to the Accumulo instance that stores the state of the Fluo Application.
+ */
+ public Optional<String> getAccumuloUsername() {
+ return Optional.fromNullable(config.get(ACCUMULO_USERNAME));
+ }
+
+ /**
+ * @return The password the indexer will authenticate when connecting
+ * to the Accumulo instance that stores the state of the Fluo Application.
+ */
+ public Optional<String> getAccumuloPassword() {
+ return Optional.fromNullable(config.get(ACCUMULO_PASSWORD));
+ }
+
+ /**
+ * @return The visibility labels that will be attached to the statements
+ * that are inserted into the Fluo Application.
+ */
+ public Optional<String> getStatementVisibility() {
+ return Optional.fromNullable(config.get(STATEMENT_VISIBILITY));
+ }
+}
\ No newline at end of file