You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rya.apache.org by pu...@apache.org on 2016/07/21 13:15:36 UTC
incubator-rya git commit: RYA-105: Fluo Integration
Repository: incubator-rya
Updated Branches:
refs/heads/develop dbd46e7a7 -> d9aaca79c
RYA-105: Fluo Integration
Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/d9aaca79
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/d9aaca79
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/d9aaca79
Branch: refs/heads/develop
Commit: d9aaca79cc1c69ca049c794f8b35e72e877e808e
Parents: dbd46e7
Author: Caleb Meier <me...@gmail.com>
Authored: Wed Jul 6 12:26:38 2016 -0400
Committer: pujav65 <pu...@gmail.com>
Committed: Thu Jul 21 09:09:39 2016 -0400
----------------------------------------------------------------------
.../java/mvm/rya/accumulo/AccumuloRyaDAO.java | 62 ++--
.../mvm/rya/indexing/accumulo/ConfigUtils.java | 48 ++-
.../external/PrecomputedJoinIndexer.java | 130 ++++---
.../external/PrecomputedJoinIndexerConfig.java | 15 +-
.../external/fluo/FluoPcjUpdaterConfig.java | 5 +-
.../external/fluo/FluoPcjUpdaterSupplier.java | 17 +-
.../rya/indexing/external/fluo/NoOpUpdater.java | 53 +++
.../external/fluo/NoOpUpdaterSupplier.java | 39 +++
.../fluo/PcjUpdaterSupplierFactory.java | 57 +++
.../rya.pcj.fluo/pcj.fluo.integration/pom.xml | 47 ++-
.../apache/rya/indexing/pcj/fluo/ITBase.java | 345 +++++++++++--------
.../pcj/fluo/api/CountStatementsIT.java | 23 +-
.../pcj/fluo/integration/RyaExportIT.java | 16 +-
.../RyaInputIncrementalUpdateIT.java | 200 +++++++++++
.../pcj/fluo/visibility/PcjVisibilityIT.java | 18 +-
15 files changed, 776 insertions(+), 299 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/d9aaca79/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/AccumuloRyaDAO.java
----------------------------------------------------------------------
diff --git a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/AccumuloRyaDAO.java b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/AccumuloRyaDAO.java
index b10c522..ed991e1 100644
--- a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/AccumuloRyaDAO.java
+++ b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/AccumuloRyaDAO.java
@@ -8,9 +8,9 @@ package mvm.rya.accumulo;
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -31,6 +31,7 @@ import static mvm.rya.api.RdfCloudTripleStoreConstants.NUM_THREADS;
import static mvm.rya.api.RdfCloudTripleStoreConstants.RTS_SUBJECT_RYA;
import static mvm.rya.api.RdfCloudTripleStoreConstants.RTS_VERSION_PREDICATE_RYA;
import static mvm.rya.api.RdfCloudTripleStoreConstants.VERSION_RYA;
+import info.aduna.iteration.CloseableIteration;
import java.io.IOException;
import java.util.Collection;
@@ -40,6 +41,18 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
+import mvm.rya.accumulo.experimental.AccumuloIndexer;
+import mvm.rya.accumulo.query.AccumuloRyaQueryEngine;
+import mvm.rya.api.RdfCloudTripleStoreConfiguration;
+import mvm.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT;
+import mvm.rya.api.domain.RyaStatement;
+import mvm.rya.api.domain.RyaURI;
+import mvm.rya.api.layout.TableLayoutStrategy;
+import mvm.rya.api.persist.RyaDAO;
+import mvm.rya.api.persist.RyaDAOException;
+import mvm.rya.api.persist.RyaNamespaceManager;
+import mvm.rya.api.resolver.RyaTripleContext;
+
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.BatchDeleter;
@@ -64,21 +77,6 @@ import org.openrdf.model.Namespace;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
-import info.aduna.iteration.CloseableIteration;
-import mvm.rya.accumulo.experimental.AccumuloIndexer;
-import mvm.rya.accumulo.query.AccumuloRyaQueryEngine;
-import mvm.rya.api.RdfCloudTripleStoreConfiguration;
-import mvm.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT;
-import mvm.rya.api.domain.RyaStatement;
-import mvm.rya.api.domain.RyaURI;
-import mvm.rya.api.layout.TableLayoutStrategy;
-import mvm.rya.api.persist.RyaDAO;
-import mvm.rya.api.persist.RyaDAOException;
-import mvm.rya.api.persist.RyaNamespaceManager;
-import mvm.rya.api.resolver.RyaTripleContext;
-import mvm.rya.api.resolver.triple.TripleRow;
-import mvm.rya.api.resolver.triple.TripleRowResolverException;
-
public class AccumuloRyaDAO implements RyaDAO<AccumuloRdfConfiguration>, RyaNamespaceManager<AccumuloRdfConfiguration> {
private static final Log logger = LogFactory.getLog(AccumuloRyaDAO.class);
@@ -111,8 +109,9 @@ public class AccumuloRyaDAO implements RyaDAO<AccumuloRdfConfiguration>, RyaName
@Override
public void init() throws RyaDAOException {
- if (initialized)
+ if (initialized) {
return;
+ }
try {
checkNotNull(conf);
checkNotNull(connector);
@@ -131,7 +130,7 @@ public class AccumuloRyaDAO implements RyaDAO<AccumuloRdfConfiguration>, RyaName
secondaryIndexers = conf.getAdditionalIndexers();
flushEachUpdate = conf.flushEachUpdate();
-
+
TableOperations tableOperations = connector.tableOperations();
AccumuloRdfUtils.createTableIfNotExist(tableOperations, tableLayoutStrategy.getSpo());
AccumuloRdfUtils.createTableIfNotExist(tableOperations, tableLayoutStrategy.getPo());
@@ -150,7 +149,7 @@ public class AccumuloRyaDAO implements RyaDAO<AccumuloRdfConfiguration>, RyaName
bw_osp = mt_bw.getBatchWriter(tableLayoutStrategy.getOsp());
bw_ns = mt_bw.getBatchWriter(tableLayoutStrategy.getNs());
-
+
for (AccumuloIndexer index : secondaryIndexers) {
index.setConnector(connector);
index.setMultiTableBatchWriter(mt_bw);
@@ -248,9 +247,15 @@ public class AccumuloRyaDAO implements RyaDAO<AccumuloRdfConfiguration>, RyaName
} catch (Exception e) {
throw new RyaDAOException(e);
} finally {
- if (bd_spo != null) bd_spo.close();
- if (bd_po != null) bd_po.close();
- if (bd_osp != null) bd_osp.close();
+ if (bd_spo != null) {
+ bd_spo.close();
+ }
+ if (bd_po != null) {
+ bd_po.close();
+ }
+ if (bd_osp != null) {
+ bd_osp.close();
+ }
}
}
@@ -520,10 +525,17 @@ public class AccumuloRyaDAO implements RyaDAO<AccumuloRdfConfiguration>, RyaName
return connector.createBatchDeleter(tableName, authorizations, NUM_THREADS, MAX_MEMORY, MAX_TIME, NUM_THREADS);
}
- private void checkVersion() throws RyaDAOException {
+ private void checkVersion() throws RyaDAOException, IOException, MutationsRejectedException {
String version = getVersion();
if (version == null) {
- this.add(getVersionRyaStatement());
+ //adding to core Rya tables but not Indexes
+ Map<TABLE_LAYOUT, Collection<Mutation>> mutationMap = ryaTableMutationsFactory.serialize(getVersionRyaStatement());
+ Collection<Mutation> spo = mutationMap.get(TABLE_LAYOUT.SPO);
+ Collection<Mutation> po = mutationMap.get(TABLE_LAYOUT.PO);
+ Collection<Mutation> osp = mutationMap.get(TABLE_LAYOUT.OSP);
+ bw_spo.addMutations(spo);
+ bw_po.addMutations(po);
+ bw_osp.addMutations(osp);
}
//TODO: Do a version check here
}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/d9aaca79/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/ConfigUtils.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/ConfigUtils.java b/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/ConfigUtils.java
index 28afce7..c15ba1b 100644
--- a/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/ConfigUtils.java
+++ b/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/ConfigUtils.java
@@ -25,6 +25,21 @@ import java.util.HashSet;
import java.util.List;
import java.util.Set;
+import mvm.rya.accumulo.AccumuloRdfConfiguration;
+import mvm.rya.api.RdfCloudTripleStoreConfiguration;
+import mvm.rya.indexing.FilterFunctionOptimizer;
+import mvm.rya.indexing.accumulo.entity.EntityCentricIndex;
+import mvm.rya.indexing.accumulo.entity.EntityOptimizer;
+import mvm.rya.indexing.accumulo.freetext.AccumuloFreeTextIndexer;
+import mvm.rya.indexing.accumulo.freetext.LuceneTokenizer;
+import mvm.rya.indexing.accumulo.freetext.Tokenizer;
+import mvm.rya.indexing.accumulo.geo.GeoMesaGeoIndexer;
+import mvm.rya.indexing.accumulo.temporal.AccumuloTemporalIndexer;
+import mvm.rya.indexing.external.PrecomputedJoinIndexer;
+import mvm.rya.indexing.mongodb.freetext.MongoFreeTextIndexer;
+import mvm.rya.indexing.mongodb.geo.MongoGeoIndexer;
+import mvm.rya.indexing.pcj.matching.PCJOptimizer;
+
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.BatchScanner;
@@ -47,22 +62,9 @@ import org.apache.log4j.Logger;
import org.openrdf.model.URI;
import org.openrdf.model.impl.URIImpl;
+import com.google.common.base.Optional;
import com.google.common.collect.Lists;
-import mvm.rya.accumulo.AccumuloRdfConfiguration;
-import mvm.rya.api.RdfCloudTripleStoreConfiguration;
-import mvm.rya.indexing.FilterFunctionOptimizer;
-import mvm.rya.indexing.accumulo.entity.EntityCentricIndex;
-import mvm.rya.indexing.accumulo.entity.EntityOptimizer;
-import mvm.rya.indexing.accumulo.freetext.AccumuloFreeTextIndexer;
-import mvm.rya.indexing.accumulo.freetext.LuceneTokenizer;
-import mvm.rya.indexing.accumulo.freetext.Tokenizer;
-import mvm.rya.indexing.accumulo.geo.GeoMesaGeoIndexer;
-import mvm.rya.indexing.accumulo.temporal.AccumuloTemporalIndexer;
-import mvm.rya.indexing.mongodb.freetext.MongoFreeTextIndexer;
-import mvm.rya.indexing.mongodb.geo.MongoGeoIndexer;
-import mvm.rya.indexing.pcj.matching.PCJOptimizer;
-
/**
* A set of configuration utils to read a Hadoop {@link Configuration} object and create Cloudbase/Accumulo objects.
*/
@@ -96,6 +98,12 @@ public class ConfigUtils {
public static final String USE_PCJ = "sc.use_pcj";
public static final String USE_OPTIMAL_PCJ = "sc.use.optimal.pcj";
+ public static final String FLUO_APP_NAME = "rya.indexing.pcj.fluo.fluoAppName";
+ public static final String USE_PCJ_FLUO_UPDATER = "rya.indexing.pcj.updater.fluo";
+ public static final String PCJ_STORAGE_TYPE = "rya.indexing.pcj.storageType";
+ public static final String PCJ_UPDATER_TYPE = "rya.indexing.pcj.updaterType";
+
+
public static final String USE_INDEXING_SAIL = "sc.use.indexing.sail";
public static final String USE_EXTERNAL_SAIL = "sc.use.external.sail";
@@ -366,6 +374,17 @@ public class ConfigUtils {
return conf.getBoolean(USE_OPTIMAL_PCJ, false);
}
+
+ /**
+ * @return The name of the Fluo Application this instance of RYA is
+ * using to incrementally update PCJs.
+ */
+ //TODO delete this eventually and use Details table
+ public Optional<String> getFluoAppName(Configuration conf) {
+ return Optional.fromNullable(conf.get(FLUO_APP_NAME));
+ }
+
+
public static boolean getUseMongo(final Configuration conf) {
return conf.getBoolean(USE_MONGO, false);
}
@@ -391,6 +410,7 @@ public class ConfigUtils {
if (getUsePCJ(conf) || getUseOptimalPCJ(conf)) {
conf.setPcjOptimizer(PCJOptimizer.class);
+ indexList.add(PrecomputedJoinIndexer.class.getName());
}
if (getUseGeo(conf)) {
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/d9aaca79/extras/indexing/src/main/java/mvm/rya/indexing/external/PrecomputedJoinIndexer.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/external/PrecomputedJoinIndexer.java b/extras/indexing/src/main/java/mvm/rya/indexing/external/PrecomputedJoinIndexer.java
index 6aaf2c4..0324a79 100644
--- a/extras/indexing/src/main/java/mvm/rya/indexing/external/PrecomputedJoinIndexer.java
+++ b/extras/indexing/src/main/java/mvm/rya/indexing/external/PrecomputedJoinIndexer.java
@@ -28,6 +28,15 @@ import java.util.Set;
import javax.annotation.ParametersAreNonnullByDefault;
+import mvm.rya.accumulo.experimental.AbstractAccumuloIndexer;
+import mvm.rya.api.RdfCloudTripleStoreConfiguration;
+import mvm.rya.api.domain.RyaStatement;
+import mvm.rya.api.domain.RyaURI;
+import mvm.rya.api.persist.RyaDAO;
+import mvm.rya.indexing.external.accumulo.AccumuloPcjStorage;
+import mvm.rya.indexing.external.accumulo.AccumuloPcjStorageSupplier;
+import mvm.rya.indexing.external.fluo.PcjUpdaterSupplierFactory;
+
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.MultiTableBatchWriter;
import org.apache.hadoop.conf.Configuration;
@@ -41,21 +50,13 @@ import org.openrdf.model.URI;
import com.google.common.base.Optional;
import com.google.common.base.Supplier;
-import mvm.rya.accumulo.experimental.AccumuloIndexer;
-import mvm.rya.api.RdfCloudTripleStoreConfiguration;
-import mvm.rya.api.domain.RyaStatement;
-import mvm.rya.api.domain.RyaURI;
-import mvm.rya.api.persist.RyaDAO;
-import mvm.rya.indexing.external.accumulo.AccumuloPcjStorage;
-import mvm.rya.indexing.external.accumulo.AccumuloPcjStorageSupplier;
-import mvm.rya.indexing.external.fluo.FluoPcjUpdaterSupplier;
-
/**
* Updates the state of the Precomputed Join indices that are used by Rya.
*/
@ParametersAreNonnullByDefault
-public class PrecomputedJoinIndexer implements AccumuloIndexer {
- private static final Logger log = Logger.getLogger(PrecomputedJoinIndexer.class);
+public class PrecomputedJoinIndexer extends AbstractAccumuloIndexer {
+ private static final Logger log = Logger
+ .getLogger(PrecomputedJoinIndexer.class);
/**
* This configuration object must be set before {@link #init()} is invoked.
@@ -64,14 +65,14 @@ public class PrecomputedJoinIndexer implements AccumuloIndexer {
private Optional<Configuration> conf = Optional.absent();
/**
- * The Accumulo Connector that must be used when accessing an Accumulo storage.
- * This value is provided by {@link #setConnector(Connector)}.
+ * The Accumulo Connector that must be used when accessing an Accumulo
+ * storage. This value is provided by {@link #setConnector(Connector)}.
*/
private Optional<Connector> accumuloConn = Optional.absent();
/**
- * Provides access to the {@link Configuration} that was provided to this class
- * using {@link #setConf(Configuration)}.
+ * Provides access to the {@link Configuration} that was provided to this
+ * class using {@link #setConf(Configuration)}.
*/
private final Supplier<Configuration> configSupplier = new Supplier<Configuration>() {
@Override
@@ -92,23 +93,22 @@ public class PrecomputedJoinIndexer implements AccumuloIndexer {
};
/**
- * Creates and grants access to the {@link PrecomputedJoinStorage} that will be used
- * to interact with the PCJ results that are stored and used by Rya.
+ * Creates and grants access to the {@link PrecomputedJoinStorage} that will
+ * be used to interact with the PCJ results that are stored and used by Rya.
*/
- private final PrecomputedJoinStorageSupplier pcjStorageSupplier =
- new PrecomputedJoinStorageSupplier(
- configSupplier,
- new AccumuloPcjStorageSupplier(configSupplier, accumuloSupplier));
+ private final PrecomputedJoinStorageSupplier pcjStorageSupplier = new PrecomputedJoinStorageSupplier(
+ configSupplier, new AccumuloPcjStorageSupplier(configSupplier,
+ accumuloSupplier));
+
+ private PrecomputedJoinStorage pcjStorage;
/**
- * Creates and grants access to the {@link PrecomputedJoinUpdater} that will
+ * Creates and grants access to the {@link PrecomputedJoinUpdater}s that will
* be used to update the state stored within the PCJ tables that are stored
* in Accumulo.
*/
- private final PrecomputedJoinUpdaterSupplier pcjUpdaterSupplier =
- new PrecomputedJoinUpdaterSupplier(
- configSupplier,
- new FluoPcjUpdaterSupplier(configSupplier));
+ private Supplier<PrecomputedJoinUpdater> updaterSupplier;
+
@Override
public void setConf(final Configuration conf) {
@@ -127,7 +127,7 @@ public class PrecomputedJoinIndexer implements AccumuloIndexer {
@Override
public void setConnector(final Connector connector) {
checkNotNull(connector);
- accumuloConn = Optional.of( connector );
+ accumuloConn = Optional.of(connector);
}
/**
@@ -135,40 +135,48 @@ public class PrecomputedJoinIndexer implements AccumuloIndexer {
*/
@Override
public void init() {
- pcjStorageSupplier.get();
- pcjUpdaterSupplier.get();
+ pcjStorage = pcjStorageSupplier.get();
+ updaterSupplier = new PcjUpdaterSupplierFactory(configSupplier).getSupplier();
+ updaterSupplier.get();
}
@Override
public void storeStatement(final RyaStatement statement) throws IOException {
checkNotNull(statement);
- storeStatements( Collections.singleton(statement) );
+ storeStatements(Collections.singleton(statement));
}
@Override
- public void storeStatements(final Collection<RyaStatement> statements) throws IOException {
+ public void storeStatements(final Collection<RyaStatement> statements)
+ throws IOException {
checkNotNull(statements);
try {
- pcjUpdaterSupplier.get().addStatements(statements);
+ updaterSupplier.get().addStatements(statements);
} catch (final PcjUpdateException e) {
- throw new IOException("Could not update the PCJs by adding the provided statements.", e);
+ throw new IOException(
+ "Could not update the PCJs by adding the provided statements.",
+ e);
}
}
@Override
- public void deleteStatement(final RyaStatement statement) throws IOException {
+ public void deleteStatement(final RyaStatement statement)
+ throws IOException {
checkNotNull(statement);
try {
- pcjUpdaterSupplier.get().deleteStatements( Collections.singleton(statement) );
+ Collection<RyaStatement> statements = Collections.singleton(statement);
+ updaterSupplier.get().deleteStatements(statements);
} catch (final PcjUpdateException e) {
- throw new IOException("Could not update the PCJs by removing the provided statement.", e);
+ throw new IOException(
+ "Could not update the PCJs by removing the provided statement.",
+ e);
}
}
@Override
public void flush() throws IOException {
try {
- pcjUpdaterSupplier.get().flush();
+ updaterSupplier.get().flush();
} catch (final PcjUpdateException e) {
throw new IOException("Could not flush the PCJ Updater.", e);
}
@@ -177,13 +185,13 @@ public class PrecomputedJoinIndexer implements AccumuloIndexer {
@Override
public void close() {
try {
- pcjStorageSupplier.get().close();
+ pcjStorage.close();
} catch (final PCJStorageException e) {
log.error("Could not close the PCJ Storage instance.", e);
}
try {
- pcjUpdaterSupplier.get().close();
+ updaterSupplier.get().close();
} catch (final PcjUpdateException e) {
log.error("Could not close the PCJ Updater instance.", e);
}
@@ -198,48 +206,56 @@ public class PrecomputedJoinIndexer implements AccumuloIndexer {
}
/**
- * Deletes all data from the PCJ indices that are managed by a {@link PrecomputedJoinStorage}.
+ * Deletes all data from the PCJ indices that are managed by a
+ * {@link PrecomputedJoinStorage}.
*/
@Override
public void purge(final RdfCloudTripleStoreConfiguration configuration) {
- final PrecomputedJoinStorage storage = pcjStorageSupplier.get();
try {
- for(final String pcjId : storage.listPcjs()) {
+ for (final String pcjId : pcjStorage.listPcjs()) {
try {
- storage.purge(pcjId);
- } catch(final PCJStorageException e) {
- log.error("Could not purge the PCJ index with id: " + pcjId, e);
+ pcjStorage.purge(pcjId);
+ } catch (final PCJStorageException e) {
+ log.error(
+ "Could not purge the PCJ index with id: " + pcjId,
+ e);
}
}
} catch (final PCJStorageException e) {
- log.error("Could not purge the PCJ indicies because they could not be listed.", e);
+ log.error(
+ "Could not purge the PCJ indicies because they could not be listed.",
+ e);
}
}
/**
- * Deletes all of the PCJ indices that are managed by {@link PrecomputedJoinStorage}.
+ * Deletes all of the PCJ indices that are managed by
+ * {@link PrecomputedJoinStorage}.
*/
@Override
public void dropAndDestroy() {
- final PrecomputedJoinStorage storage = pcjStorageSupplier.get();
-
try {
- for(final String pcjId : storage.listPcjs()) {
+ for (final String pcjId : pcjStorage.listPcjs()) {
try {
- storage.dropPcj(pcjId);
- } catch(final PCJStorageException e) {
- log.error("Could not delete the PCJ index with id: " + pcjId, e);
+ pcjStorage.dropPcj(pcjId);
+ } catch (final PCJStorageException e) {
+ log.error("Could not delete the PCJ index with id: "
+ + pcjId, e);
}
}
- } catch(final PCJStorageException e) {
- log.error("Could not delete the PCJ indicies because they could not be listed.", e);
+ } catch (final PCJStorageException e) {
+ log.error(
+ "Could not delete the PCJ indicies because they could not be listed.",
+ e);
}
}
@Override
- public void setMultiTableBatchWriter(final MultiTableBatchWriter writer) throws IOException {
- // We do not need to use the writer that also writes to the core RYA tables.
+ public void setMultiTableBatchWriter(final MultiTableBatchWriter writer)
+ throws IOException {
+ // We do not need to use the writer that also writes to the core RYA
+ // tables.
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/d9aaca79/extras/indexing/src/main/java/mvm/rya/indexing/external/PrecomputedJoinIndexerConfig.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/external/PrecomputedJoinIndexerConfig.java b/extras/indexing/src/main/java/mvm/rya/indexing/external/PrecomputedJoinIndexerConfig.java
index c56f574..3c76601 100644
--- a/extras/indexing/src/main/java/mvm/rya/indexing/external/PrecomputedJoinIndexerConfig.java
+++ b/extras/indexing/src/main/java/mvm/rya/indexing/external/PrecomputedJoinIndexerConfig.java
@@ -22,14 +22,15 @@ import static com.google.common.base.Preconditions.checkNotNull;
import javax.annotation.ParametersAreNonnullByDefault;
+import mvm.rya.api.persist.index.RyaSecondaryIndexer;
+import mvm.rya.indexing.accumulo.ConfigUtils;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
import org.apache.rya.indexing.pcj.update.PrecomputedJoinUpdater;
import com.google.common.base.Optional;
-import mvm.rya.api.persist.index.RyaSecondaryIndexer;
-
/**
* Inspects the {@link Configuration} object that is provided to all instances
* of {@link RyaSecondaryIndexer} to provide {@link PrecomputedJoinIndexer}
@@ -55,7 +56,7 @@ public class PrecomputedJoinIndexerConfig {
/**
* Incrementally updates the PCJs is pseudo-realtime new adds/deletes are encountered.
*/
- FLUO;
+ FLUO, NO_UPDATE;
}
// Indicates which implementation of PrecomputedJoinStorage to use.
@@ -63,6 +64,7 @@ public class PrecomputedJoinIndexerConfig {
// Indicates which implementation of PrecomputedJoinUpdater to use.
public static final String PCJ_UPDATER_TYPE = "rya.indexing.pcj.updaterType";
+ public static final String USE_PCJ_FLUO_UPDATER = ConfigUtils.USE_PCJ_FLUO_UPDATER;
// The configuration object that is provided to Secondary Indexing implementations.
private final Configuration config;
@@ -104,6 +106,13 @@ public class PrecomputedJoinIndexerConfig {
return Optional.fromNullable(updaterType);
}
+
+
+ public boolean getUseFluoUpdater() {
+ return config.getBoolean(USE_PCJ_FLUO_UPDATER, false);
+ }
+
+
/**
* @return The configuration object that has been wrapped.
*/
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/d9aaca79/extras/indexing/src/main/java/mvm/rya/indexing/external/fluo/FluoPcjUpdaterConfig.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/external/fluo/FluoPcjUpdaterConfig.java b/extras/indexing/src/main/java/mvm/rya/indexing/external/fluo/FluoPcjUpdaterConfig.java
index 4378a4a..2a34a82 100644
--- a/extras/indexing/src/main/java/mvm/rya/indexing/external/fluo/FluoPcjUpdaterConfig.java
+++ b/extras/indexing/src/main/java/mvm/rya/indexing/external/fluo/FluoPcjUpdaterConfig.java
@@ -19,20 +19,19 @@
package mvm.rya.indexing.external.fluo;
import static com.google.common.base.Preconditions.checkNotNull;
+import mvm.rya.indexing.accumulo.ConfigUtils;
import org.apache.hadoop.conf.Configuration;
import com.google.common.base.Optional;
-import mvm.rya.indexing.accumulo.ConfigUtils;
-
/**
* Configuration values required to initialize a {@link FluoPcjUpdater}.
*/
public final class FluoPcjUpdaterConfig {
// Defines which Fluo application is running for this instance of Rya.
- public static final String FLUO_APP_NAME = "rya.indexing.pcj.fluo.fluoAppName";
+ public static final String FLUO_APP_NAME = ConfigUtils.FLUO_APP_NAME;
// Values that define which Accumulo instance hosts the Fluo application's table.
public static final String ACCUMULO_ZOOKEEPERS = ConfigUtils.CLOUDBASE_ZOOKEEPERS;
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/d9aaca79/extras/indexing/src/main/java/mvm/rya/indexing/external/fluo/FluoPcjUpdaterSupplier.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/external/fluo/FluoPcjUpdaterSupplier.java b/extras/indexing/src/main/java/mvm/rya/indexing/external/fluo/FluoPcjUpdaterSupplier.java
index ed202f4..61de078 100644
--- a/extras/indexing/src/main/java/mvm/rya/indexing/external/fluo/FluoPcjUpdaterSupplier.java
+++ b/extras/indexing/src/main/java/mvm/rya/indexing/external/fluo/FluoPcjUpdaterSupplier.java
@@ -26,25 +26,26 @@ import static mvm.rya.indexing.external.fluo.FluoPcjUpdaterConfig.ACCUMULO_USERN
import static mvm.rya.indexing.external.fluo.FluoPcjUpdaterConfig.ACCUMULO_ZOOKEEPERS;
import static mvm.rya.indexing.external.fluo.FluoPcjUpdaterConfig.FLUO_APP_NAME;
import static mvm.rya.indexing.external.fluo.FluoPcjUpdaterConfig.STATEMENT_VISIBILITY;
+import io.fluo.api.client.FluoClient;
+import io.fluo.api.client.FluoFactory;
+import io.fluo.api.config.FluoConfiguration;
import javax.annotation.ParametersAreNonnullByDefault;
+import mvm.rya.indexing.external.PrecomputedJoinIndexerConfig;
+import mvm.rya.indexing.external.PrecomputedJoinIndexerConfig.PrecomputedJoinUpdaterType;
+
import org.apache.hadoop.conf.Configuration;
+import org.apache.rya.indexing.pcj.update.PrecomputedJoinUpdater;
import com.google.common.base.Optional;
import com.google.common.base.Supplier;
-import io.fluo.api.client.FluoClient;
-import io.fluo.api.client.FluoFactory;
-import io.fluo.api.config.FluoConfiguration;
-import mvm.rya.indexing.external.PrecomputedJoinIndexerConfig;
-import mvm.rya.indexing.external.PrecomputedJoinIndexerConfig.PrecomputedJoinUpdaterType;
-
/**
* Creates instances of {@link FluoPcjUpdater} using the values found in a {@link Configuration}.
*/
@ParametersAreNonnullByDefault
-public class FluoPcjUpdaterSupplier implements Supplier<FluoPcjUpdater> {
+public class FluoPcjUpdaterSupplier implements Supplier<PrecomputedJoinUpdater> {
private final Supplier<Configuration> configSupplier;
@@ -66,7 +67,7 @@ public class FluoPcjUpdaterSupplier implements Supplier<FluoPcjUpdater> {
final PrecomputedJoinIndexerConfig indexerConfig = new PrecomputedJoinIndexerConfig(config);
final Optional<PrecomputedJoinUpdaterType> updaterType = indexerConfig.getPcjUpdaterType();
- checkArgument(updaterType.isPresent() && (updaterType.get() == PrecomputedJoinUpdaterType.FLUO),
+ checkArgument(updaterType.isPresent() && updaterType.get() == PrecomputedJoinUpdaterType.FLUO,
"This supplier requires the '" + PrecomputedJoinIndexerConfig.PCJ_UPDATER_TYPE +
"' value be set to '" + PrecomputedJoinUpdaterType.FLUO + "'.");
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/d9aaca79/extras/indexing/src/main/java/mvm/rya/indexing/external/fluo/NoOpUpdater.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/external/fluo/NoOpUpdater.java b/extras/indexing/src/main/java/mvm/rya/indexing/external/fluo/NoOpUpdater.java
new file mode 100644
index 0000000..66a6b24
--- /dev/null
+++ b/extras/indexing/src/main/java/mvm/rya/indexing/external/fluo/NoOpUpdater.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package mvm.rya.indexing.external.fluo;
+
+import java.util.Collection;
+
+import mvm.rya.api.domain.RyaStatement;
+import mvm.rya.indexing.external.PrecomputedJoinIndexer;
+
+import org.apache.rya.indexing.pcj.update.PrecomputedJoinUpdater;
+
+/**
+ * A NoOp updater (which does nothing) to be used by {@link PrecomputedJoinIndexer} if neither Batch nor
+ * {@link FluoPcjUpdater} is specified by the user to update Precomputed Joins.
+ *
+ */
+public class NoOpUpdater implements PrecomputedJoinUpdater {
+
+ @Override
+ public void addStatements(Collection<RyaStatement> statements)
+ throws PcjUpdateException {
+ }
+
+ @Override
+ public void deleteStatements(Collection<RyaStatement> statements)
+ throws PcjUpdateException {
+ }
+
+ @Override
+ public void flush() throws PcjUpdateException {
+ }
+
+ @Override
+ public void close() throws PcjUpdateException {
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/d9aaca79/extras/indexing/src/main/java/mvm/rya/indexing/external/fluo/NoOpUpdaterSupplier.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/external/fluo/NoOpUpdaterSupplier.java b/extras/indexing/src/main/java/mvm/rya/indexing/external/fluo/NoOpUpdaterSupplier.java
new file mode 100644
index 0000000..6831353
--- /dev/null
+++ b/extras/indexing/src/main/java/mvm/rya/indexing/external/fluo/NoOpUpdaterSupplier.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package mvm.rya.indexing.external.fluo;
+
+import mvm.rya.indexing.external.PrecomputedJoinIndexer;
+
+import org.apache.rya.indexing.pcj.update.PrecomputedJoinUpdater;
+
+import com.google.common.base.Supplier;
+
+/**
+ * A {@link Supplier} for {@link NoOpUdater}s. This Supplier is used by
+ * {@link PrecomputedJoinIndexer} when no update strategy is specified by the user.
+ *
+ */
+public class NoOpUpdaterSupplier implements Supplier<PrecomputedJoinUpdater> {
+
+ @Override
+ public NoOpUpdater get() {
+ return new NoOpUpdater();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/d9aaca79/extras/indexing/src/main/java/mvm/rya/indexing/external/fluo/PcjUpdaterSupplierFactory.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/external/fluo/PcjUpdaterSupplierFactory.java b/extras/indexing/src/main/java/mvm/rya/indexing/external/fluo/PcjUpdaterSupplierFactory.java
new file mode 100644
index 0000000..0250b6d
--- /dev/null
+++ b/extras/indexing/src/main/java/mvm/rya/indexing/external/fluo/PcjUpdaterSupplierFactory.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package mvm.rya.indexing.external.fluo;
+
+import mvm.rya.indexing.external.PrecomputedJoinIndexer;
+import mvm.rya.indexing.external.PrecomputedJoinIndexerConfig;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.rya.indexing.pcj.update.PrecomputedJoinUpdater;
+
+import com.google.common.base.Supplier;
+import com.google.common.base.Suppliers;
+
+/**
+ * A factory for {@link Supplier}s used by {@link PrecomputedJoinIndexer} to
+ * get all update strategies for precomputed joins for a given Rya instance.
+ *
+ */
+public class PcjUpdaterSupplierFactory {
+
+ private Supplier<Configuration> configSupplier;
+
+ public PcjUpdaterSupplierFactory(Supplier<Configuration> configSupplier) {
+ this.configSupplier = configSupplier;
+ }
+
+ public Supplier<PrecomputedJoinUpdater> getSupplier() {
+
+ PrecomputedJoinIndexerConfig config = new PrecomputedJoinIndexerConfig(configSupplier.get());
+ //TODO this should not be read from the config. Instead,
+ //this information should be retrieved from the RyaDetails table
+ if(config.getUseFluoUpdater()) {
+ return Suppliers.memoize(new FluoPcjUpdaterSupplier(configSupplier));
+ }
+ else {
+ return Suppliers.memoize(new NoOpUpdaterSupplier());
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/d9aaca79/extras/rya.pcj.fluo/pcj.fluo.integration/pom.xml
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/pom.xml b/extras/rya.pcj.fluo/pcj.fluo.integration/pom.xml
index 6ef282f..6ca8cd7 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.integration/pom.xml
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/pom.xml
@@ -1,25 +1,15 @@
<?xml version="1.0" encoding="utf-8"?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-<project
- xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor
+ license agreements. See the NOTICE file distributed with this work for additional
+ information regarding copyright ownership. The ASF licenses this file to
+ you under the Apache License, Version 2.0 (the "License"); you may not use
+ this file except in compliance with the License. You may obtain a copy of
+ the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
+ by applicable law or agreed to in writing, software distributed under the
+ License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
+ OF ANY KIND, either express or implied. See the License for the specific
+ language governing permissions and limitations under the License. -->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
@@ -27,13 +17,13 @@ under the License.
<artifactId>rya.pcj.fluo.parent</artifactId>
<version>3.2.10-SNAPSHOT</version>
</parent>
-
+
<modelVersion>4.0.0</modelVersion>
<artifactId>rya.pcj.fluo.integration</artifactId>
-
+
<name>Apache Rya PCJ Fluo Integration Tests</name>
<description>Integration tests for the Rya Fluo application.</description>
-
+
<dependencies>
<!-- Rya Runtime Dependencies. -->
<dependency>
@@ -48,7 +38,10 @@ under the License.
<groupId>org.apache.rya</groupId>
<artifactId>rya.pcj.fluo.client</artifactId>
</dependency>
-
+ <dependency>
+ <groupId>org.apache.rya</groupId>
+ <artifactId>rya.indexing</artifactId>
+ </dependency>
<!-- Testing dependencies. -->
<dependency>
<groupId>io.fluo</groupId>
@@ -60,5 +53,9 @@ under the License.
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>io.fluo</groupId>
+ <artifactId>fluo-api</artifactId>
+ </dependency>
</dependencies>
</project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/d9aaca79/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/ITBase.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/ITBase.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/ITBase.java
index 618cab9..154156f 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/ITBase.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/ITBase.java
@@ -20,8 +20,23 @@ package org.apache.rya.indexing.pcj.fluo;
import static com.google.common.base.Preconditions.checkNotNull;
+import io.fluo.api.client.FluoAdmin;
+import io.fluo.api.client.FluoAdmin.AlreadyInitializedException;
+import io.fluo.api.client.FluoAdmin.TableExistsException;
+import io.fluo.api.client.FluoClient;
+import io.fluo.api.client.FluoFactory;
+import io.fluo.api.client.Snapshot;
+import io.fluo.api.config.FluoConfiguration;
+import io.fluo.api.config.ObserverConfiguration;
+import io.fluo.api.config.ScannerConfiguration;
+import io.fluo.api.data.Bytes;
+import io.fluo.api.iterator.ColumnIterator;
+import io.fluo.api.iterator.RowIterator;
+import io.fluo.api.mini.MiniFluo;
+
import java.io.File;
import java.io.IOException;
+import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
@@ -29,6 +44,20 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
+import java.util.UUID;
+
+import mvm.rya.accumulo.AccumuloRdfConfiguration;
+import mvm.rya.api.domain.RyaStatement;
+import mvm.rya.api.domain.RyaStatement.RyaStatementBuilder;
+import mvm.rya.api.domain.RyaType;
+import mvm.rya.api.domain.RyaURI;
+import mvm.rya.api.persist.RyaDAOException;
+import mvm.rya.api.resolver.RyaToRdfConversions;
+import mvm.rya.indexing.accumulo.ConfigUtils;
+import mvm.rya.indexing.external.PrecomputedJoinIndexerConfig;
+import mvm.rya.rdftriplestore.RyaSailRepository;
+import mvm.rya.rdftriplestore.inference.InferenceEngineException;
+import mvm.rya.sail.config.RyaSailFactory;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
@@ -37,6 +66,7 @@ import org.apache.accumulo.core.client.Instance;
import org.apache.accumulo.core.client.ZooKeeperInstance;
import org.apache.accumulo.core.client.security.tokens.PasswordToken;
import org.apache.accumulo.minicluster.MiniAccumuloCluster;
+import org.apache.accumulo.minicluster.MiniAccumuloConfig;
import org.apache.log4j.Logger;
import org.apache.rya.indexing.pcj.fluo.app.observers.FilterObserver;
import org.apache.rya.indexing.pcj.fluo.app.observers.JoinObserver;
@@ -57,112 +87,91 @@ import org.openrdf.query.BindingSet;
import org.openrdf.query.impl.MapBindingSet;
import org.openrdf.repository.RepositoryConnection;
import org.openrdf.repository.RepositoryException;
+import org.openrdf.sail.Sail;
import com.google.common.io.Files;
-import io.fluo.api.client.FluoClient;
-import io.fluo.api.client.FluoFactory;
-import io.fluo.api.client.Snapshot;
-import io.fluo.api.config.FluoConfiguration;
-import io.fluo.api.config.ObserverConfiguration;
-import io.fluo.api.config.ScannerConfiguration;
-import io.fluo.api.data.Bytes;
-import io.fluo.api.iterator.ColumnIterator;
-import io.fluo.api.iterator.RowIterator;
-import io.fluo.api.mini.MiniFluo;
-import mvm.rya.accumulo.AccumuloRdfConfiguration;
-import mvm.rya.accumulo.AccumuloRyaDAO;
-import mvm.rya.api.RdfCloudTripleStoreConfiguration;
-import mvm.rya.api.domain.RyaStatement;
-import mvm.rya.api.domain.RyaStatement.RyaStatementBuilder;
-import mvm.rya.api.domain.RyaType;
-import mvm.rya.api.domain.RyaURI;
-import mvm.rya.api.resolver.RyaToRdfConversions;
-import mvm.rya.rdftriplestore.RdfCloudTripleStore;
-import mvm.rya.rdftriplestore.RyaSailRepository;
-
/**
- * Integration tests that ensure the Fluo application processes PCJs results correctly.
+ * Integration tests that ensure the Fluo application processes PCJs results
+ * correctly.
* <p>
* This class is being ignored because it doesn't contain any unit tests.
*/
public abstract class ITBase {
private static final Logger log = Logger.getLogger(ITBase.class);
- public static final String USE_MOCK_INSTANCE = ".useMockInstance";
- public static final String CLOUDBASE_INSTANCE = "sc.cloudbase.instancename";
- public static final String CLOUDBASE_USER = "sc.cloudbase.username";
- public static final String CLOUDBASE_PASSWORD = "sc.cloudbase.password";
-
protected static final String RYA_TABLE_PREFIX = "demo_";
+
+ protected static final String ACCUMULO_USER = "root";
+ protected static final String ACCUMULO_PASSWORD = "password";
// Rya data store and connections.
- protected MiniAccumuloCluster accumulo = null;
- protected static Connector accumuloConn = null;
protected RyaSailRepository ryaRepo = null;
protected RepositoryConnection ryaConn = null;
+
+ // Mini Accumulo Cluster
+ protected MiniAccumuloCluster cluster;
+ protected static Connector accumuloConn = null;
+ protected String instanceName = null;
+ protected String zookeepers = null;
+
// Fluo data store and connections.
protected MiniFluo fluo = null;
protected FluoClient fluoClient = null;
+ protected final String appName = "IntegrationTests";
@Before
- public void setupMiniResources() throws IOException, InterruptedException, AccumuloException, AccumuloSecurityException, RepositoryException {
- // Initialize the Mini Accumulo that will be used to store Triples and get a connection to it.
- accumulo = startMiniAccumulo();
-
- // Setup the Rya library to use the Mini Accumulo.
- ryaRepo = setupRya(accumulo);
- ryaConn = ryaRepo.getConnection();
-
+ public void setupMiniResources()
+ throws IOException, InterruptedException, AccumuloException, AccumuloSecurityException, RepositoryException,
+ RyaDAOException, NumberFormatException, InferenceEngineException, AlreadyInitializedException, TableExistsException {
+ // Initialize the Mini Accumulo that will be used to host Rya and Fluo.
+ setupMiniAccumulo();
+
// Initialize the Mini Fluo that will be used to store created queries.
fluo = startMiniFluo();
- fluoClient = FluoFactory.newClient( fluo.getClientConfiguration() );
- }
+ fluoClient = FluoFactory.newClient(fluo.getClientConfiguration());
+ // Initialize the Rya that will be used by the tests.
+ ryaRepo = setupRya(ACCUMULO_USER, ACCUMULO_PASSWORD, instanceName, zookeepers, appName);
+ ryaConn = ryaRepo.getConnection();
+ }
+
@After
public void shutdownMiniResources() {
- if(ryaConn != null) {
+ // TODO shutdown the cluster
+
+ if (ryaConn != null) {
try {
log.info("Shutting down Rya Connection.");
ryaConn.close();
log.info("Rya Connection shut down.");
- } catch(final Exception e) {
+ } catch (final Exception e) {
log.error("Could not shut down the Rya Connection.", e);
}
}
- if(ryaRepo != null) {
+ if (ryaRepo != null) {
try {
log.info("Shutting down Rya Repo.");
ryaRepo.shutDown();
log.info("Rya Repo shut down.");
- } catch(final Exception e) {
+ } catch (final Exception e) {
log.error("Could not shut down the Rya Repo.", e);
}
}
-
- if(accumulo != null) {
- try {
- log.info("Shutting down the Mini Accumulo being used as a Rya store.");
- accumulo.stop();
- log.info("Mini Accumulo being used as a Rya store shut down.");
- } catch(final Exception e) {
- log.error("Could not shut down the Mini Accumulo.", e);
- }
- }
-
- if(fluoClient != null) {
+
+ if (fluoClient != null) {
try {
log.info("Shutting down Fluo Client.");
fluoClient.close();
log.info("Fluo Client shut down.");
- } catch(final Exception e) {
+ } catch (final Exception e) {
log.error("Could not shut down the Fluo Client.", e);
}
}
- if(fluo != null) {
+ if (fluo != null) {
try {
log.info("Shutting down Mini Fluo.");
fluo.close();
@@ -171,28 +180,44 @@ public abstract class ITBase {
log.error("Could not shut down the Mini Fluo.", e);
}
}
+
+ if(cluster != null) {
+ try {
+ log.info("Shutting down the Mini Accumulo being used as a Rya store.");
+ cluster.stop();
+ log.info("Mini Accumulo being used as a Rya store shut down.");
+ } catch(final Exception e) {
+ log.error("Could not shut down the Mini Accumulo.", e);
+ }
+ }
}
/**
- * A helper fuction for creating a {@link BindingSet} from an array of {@link Binding}s.
+ * A helper fuction for creating a {@link BindingSet} from an array of
+ * {@link Binding}s.
*
- * @param bindings - The bindings to include in the set. (not null)
+ * @param bindings
+ * - The bindings to include in the set. (not null)
* @return A {@link BindingSet} holding the bindings.
*/
protected static BindingSet makeBindingSet(final Binding... bindings) {
final MapBindingSet bindingSet = new MapBindingSet();
- for(final Binding binding : bindings) {
+ for (final Binding binding : bindings) {
bindingSet.addBinding(binding);
}
return bindingSet;
}
/**
- * A helper function for creating a {@link RyaStatement} that represents a Triple.
+ * A helper function for creating a {@link RyaStatement} that represents a
+ * Triple.
*
- * @param subject - The Subject of the Triple. (not null)
- * @param predicate - The Predicate of the Triple. (not null)
- * @param object - The Object of the Triple. (not null)
+ * @param subject
+ * - The Subject of the Triple. (not null)
+ * @param predicate
+ * - The Predicate of the Triple. (not null)
+ * @param object
+ * - The Object of the Triple. (not null)
* @return A Triple as a {@link RyaStatement}.
*/
protected static RyaStatement makeRyaStatement(final String subject, final String predicate, final String object) {
@@ -200,44 +225,48 @@ public abstract class ITBase {
checkNotNull(predicate);
checkNotNull(object);
- final RyaStatementBuilder builder = RyaStatement.builder()
- .setSubject( new RyaURI(subject) )
- .setPredicate( new RyaURI(predicate) );
+ final RyaStatementBuilder builder = RyaStatement.builder().setSubject(new RyaURI(subject))
+ .setPredicate(new RyaURI(predicate));
- if(object.startsWith("http://")) {
- builder.setObject(new RyaURI(object) );
+ if (object.startsWith("http://")) {
+ builder.setObject(new RyaURI(object));
} else {
- builder.setObject( new RyaType(object) );
+ builder.setObject(new RyaType(object));
}
return builder.build();
}
/**
- * A helper function for creating a {@link RyaStatement} that represents a Triple.
+ * A helper function for creating a {@link RyaStatement} that represents a
+ * Triple.
*
- * @param subject - The Subject of the Triple. (not null)
- * @param predicate - The Predicate of the Triple. (not null)
- * @param object - The Object of the Triple. (not null)
+ * @param subject
+ * - The Subject of the Triple. (not null)
+ * @param predicate
+ * - The Predicate of the Triple. (not null)
+ * @param object
+ * - The Object of the Triple. (not null)
* @return A Triple as a {@link RyaStatement}.
*/
protected static RyaStatement makeRyaStatement(final String subject, final String predicate, final int object) {
checkNotNull(subject);
checkNotNull(predicate);
- return RyaStatement.builder()
- .setSubject(new RyaURI(subject))
- .setPredicate(new RyaURI(predicate))
- .setObject( new RyaType(XMLSchema.INT, "" + object) )
- .build();
+ return RyaStatement.builder().setSubject(new RyaURI(subject)).setPredicate(new RyaURI(predicate))
+ .setObject(new RyaType(XMLSchema.INT, "" + object)).build();
}
/**
- * A helper function for creating a Sesame {@link Statement} that represents a Triple..
+ * A helper function for creating a Sesame {@link Statement} that represents
+ * a Triple..
*
- * @param subject - The Subject of the Triple. (not null)
- * @param predicate - The Predicate of the Triple. (not null)
- * @param object - The Object of the Triple. (not null)
+ * @param subject
+ * - The Subject of the Triple. (not null)
+ * @param predicate
+ * - The Predicate of the Triple. (not null)
+ * @param object
+ * - The Object of the Triple. (not null)
* @return A Triple as a {@link Statement}.
*/
protected static Statement makeStatement(final String subject, final String predicate, final String object) {
@@ -253,14 +282,17 @@ public abstract class ITBase {
* Fetches the binding sets that are the results of a specific SPARQL query
* from the Fluo table.
*
- * @param fluoClient- A connection to the Fluo table where the results reside. (not null)
- * @param sparql - This query's results will be fetched. (not null)
+ * @param fluoClient-
+ * A connection to the Fluo table where the results reside. (not
+ * null)
+ * @param sparql
+ * - This query's results will be fetched. (not null)
* @return The binding sets for the query's results.
*/
protected static Set<BindingSet> getQueryBindingSetValues(final FluoClient fluoClient, final String sparql) {
final Set<BindingSet> bindingSets = new HashSet<>();
- try(Snapshot snapshot = fluoClient.newSnapshot()) {
+ try (Snapshot snapshot = fluoClient.newSnapshot()) {
final String queryId = snapshot.get(Bytes.of(sparql), FluoQueryColumns.QUERY_ID).toString();
// Fetch the query's variable order.
@@ -269,12 +301,13 @@ public abstract class ITBase {
// Fetch the Binding Sets for the query.
final ScannerConfiguration scanConfig = new ScannerConfiguration();
- scanConfig.fetchColumn(FluoQueryColumns.QUERY_BINDING_SET.getFamily(), FluoQueryColumns.QUERY_BINDING_SET.getQualifier());
+ scanConfig.fetchColumn(FluoQueryColumns.QUERY_BINDING_SET.getFamily(),
+ FluoQueryColumns.QUERY_BINDING_SET.getQualifier());
BindingSetStringConverter converter = new BindingSetStringConverter();
final RowIterator rowIter = snapshot.get(scanConfig);
- while(rowIter.hasNext()) {
+ while (rowIter.hasNext()) {
final Entry<Bytes, ColumnIterator> row = rowIter.next();
final String bindingSetString = row.getValue().next().getValue().toString();
final BindingSet bindingSet = converter.convert(bindingSetString, varOrder);
@@ -285,77 +318,94 @@ public abstract class ITBase {
return bindingSets;
}
- /**
- * Setup a Mini Accumulo cluster that uses a temporary directory to store its data.
- *
- * @return A Mini Accumulo cluster.
- */
- private static MiniAccumuloCluster startMiniAccumulo() throws IOException, InterruptedException, AccumuloException, AccumuloSecurityException {
- final File miniDataDir = Files.createTempDir();
-
- // Setup and start the Mini Accumulo.
- final MiniAccumuloCluster accumulo = new MiniAccumuloCluster(miniDataDir, "password");
- accumulo.start();
-
- // Store a connector to the Mini Accumulo.
- final Instance instance = new ZooKeeperInstance(accumulo.getInstanceName(), accumulo.getZooKeepers());
- accumuloConn = instance.getConnector("root", new PasswordToken("password"));
-
- return accumulo;
+ private void setupMiniAccumulo() throws IOException, InterruptedException, AccumuloException, AccumuloSecurityException {
+ File miniDataDir = Files.createTempDir();
+
+ // Setup and start the Mini Accumulo.
+ MiniAccumuloConfig cfg = new MiniAccumuloConfig(miniDataDir, ACCUMULO_PASSWORD);
+ cluster = new MiniAccumuloCluster(cfg);
+ cluster.start();
+
+ // Store a connector to the Mini Accumulo.
+ instanceName = cluster.getInstanceName();
+ zookeepers = cluster.getZooKeepers();
+
+ Instance instance = new ZooKeeperInstance(instanceName, zookeepers);
+ accumuloConn = instance.getConnector(ACCUMULO_USER, new PasswordToken(ACCUMULO_PASSWORD));
}
- /**
- * Format a Mini Accumulo to be a Rya repository.
- *
- * @param accumulo - The Mini Accumulo cluster Rya will sit on top of. (not null)
- * @return The Rya repository sitting on top of the Mini Accumulo.
- */
- private static RyaSailRepository setupRya(final MiniAccumuloCluster accumulo) throws AccumuloException, AccumuloSecurityException, RepositoryException {
- checkNotNull(accumulo);
-
- // Setup the Rya Repository that will be used to create Repository Connections.
- final RdfCloudTripleStore ryaStore = new RdfCloudTripleStore();
- final AccumuloRyaDAO crdfdao = new AccumuloRyaDAO();
- crdfdao.setConnector(accumuloConn);
+ /**
+ * Sets up a Rya instance
+ *
+ * @param user
+ * @param password
+ * @param instanceName
+ * @param zookeepers
+ * @param appName
+ * @return
+ * @throws AccumuloException
+ * @throws AccumuloSecurityException
+ * @throws RepositoryException
+ * @throws RyaDAOException
+ * @throws NumberFormatException
+ * @throws UnknownHostException
+ * @throws InferenceEngineException
+ */
+ protected static RyaSailRepository setupRya(String user, String password, String instanceName, String zookeepers, String appName)
+ throws AccumuloException, AccumuloSecurityException, RepositoryException, RyaDAOException,
+ NumberFormatException, UnknownHostException, InferenceEngineException {
+
+ checkNotNull(user);
+ checkNotNull(password);
+ checkNotNull(instanceName);
+ checkNotNull(zookeepers);
+ checkNotNull(appName);
// Setup Rya configuration values.
final AccumuloRdfConfiguration conf = new AccumuloRdfConfiguration();
- conf.setTablePrefix("demo_");
+ conf.setTablePrefix(RYA_TABLE_PREFIX);
conf.setDisplayQueryPlan(true);
-
- conf.setBoolean(USE_MOCK_INSTANCE, true);
- conf.set(RdfCloudTripleStoreConfiguration.CONF_TBL_PREFIX, RYA_TABLE_PREFIX);
- conf.set(CLOUDBASE_USER, "root");
- conf.set(CLOUDBASE_PASSWORD, "password");
- conf.set(CLOUDBASE_INSTANCE, accumulo.getInstanceName());
-
- crdfdao.setConf(conf);
- ryaStore.setRyaDAO(crdfdao);
-
- final RyaSailRepository ryaRepo = new RyaSailRepository(ryaStore);
+ conf.setBoolean(ConfigUtils.USE_MOCK_INSTANCE, false);
+ conf.set(ConfigUtils.CLOUDBASE_USER, user);
+ conf.set(ConfigUtils.CLOUDBASE_PASSWORD, password);
+ conf.set(ConfigUtils.CLOUDBASE_INSTANCE, instanceName);
+ conf.set(ConfigUtils.CLOUDBASE_ZOOKEEPERS, zookeepers);
+ conf.set(ConfigUtils.USE_PCJ, "true");
+ conf.set(ConfigUtils.USE_PCJ_FLUO_UPDATER, "true");
+ conf.set(ConfigUtils.FLUO_APP_NAME, appName);
+ conf.set(ConfigUtils.PCJ_STORAGE_TYPE,
+ PrecomputedJoinIndexerConfig.PrecomputedJoinStorageType.ACCUMULO.toString());
+ conf.set(ConfigUtils.PCJ_UPDATER_TYPE, PrecomputedJoinIndexerConfig.PrecomputedJoinUpdaterType.FLUO.toString());
+ conf.set(ConfigUtils.CLOUDBASE_AUTHS, "");
+
+ Sail sail = RyaSailFactory.getInstance(conf);
+ final RyaSailRepository ryaRepo = new RyaSailRepository(sail);
ryaRepo.initialize();
return ryaRepo;
}
/**
- * Override this method to provide an output configuration to the Fluo application.
+ * Override this method to provide an output configuration to the Fluo
+ * application.
* <p>
* Returns an empty map by default.
*
- * @return The parameters that will be passed to {@link QueryResultObserver} at startup.
+ * @return The parameters that will be passed to {@link QueryResultObserver}
+ * at startup.
*/
protected Map<String, String> makeExportParams() {
return new HashMap<>();
}
/**
- * Setup a Mini Fluo cluster that uses a temporary directory to store its data.ll
+ * Setup a Mini Fluo cluster that uses a temporary directory to store its
+ * data.ll
*
* @return A Mini Fluo cluster.
*/
- protected MiniFluo startMiniFluo() {
- final File miniDataDir = Files.createTempDir();
+ protected MiniFluo startMiniFluo() throws AlreadyInitializedException, TableExistsException {
+// final File miniDataDir = Files.createTempDir();
// Setup the observers that will be used by the Fluo PCJ Application.
final List<ObserverConfiguration> observers = new ArrayList<>();
@@ -364,18 +414,29 @@ public abstract class ITBase {
observers.add(new ObserverConfiguration(JoinObserver.class.getName()));
observers.add(new ObserverConfiguration(FilterObserver.class.getName()));
- // Provide export parameters child test classes may provide to the export observer.
- final ObserverConfiguration exportObserverConfig = new ObserverConfiguration(QueryResultObserver.class.getName());
- exportObserverConfig.setParameters( makeExportParams() );
+ // Provide export parameters child test classes may provide to the
+ // export observer.
+ final ObserverConfiguration exportObserverConfig = new ObserverConfiguration(
+ QueryResultObserver.class.getName());
+ exportObserverConfig.setParameters(makeExportParams());
observers.add(exportObserverConfig);
// Configure how the mini fluo will run.
final FluoConfiguration config = new FluoConfiguration();
- config.setApplicationName("IntegrationTests");
- config.setMiniDataDir(miniDataDir.getAbsolutePath());
+ config.setMiniStartAccumulo(false);
+ config.setAccumuloInstance(instanceName);
+ config.setAccumuloUser(ACCUMULO_USER);
+ config.setAccumuloPassword(ACCUMULO_PASSWORD);
+ config.setInstanceZookeepers(zookeepers + "/fluo");
+ config.setAccumuloZookeepers(zookeepers);
+
+ config.setApplicationName(appName);
+ config.setAccumuloTable("fluo" + appName);
+
config.addObservers(observers);
- final MiniFluo miniFluo = FluoFactory.newMiniFluo(config);
- return miniFluo;
+ FluoFactory.newAdmin(config).initialize(
+ new FluoAdmin.InitOpts().setClearTable(true).setClearZookeeper(true) );
+ return FluoFactory.newMiniFluo(config);
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/d9aaca79/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/CountStatementsIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/CountStatementsIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/CountStatementsIT.java
index 566d2d2..124f5a9 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/CountStatementsIT.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/CountStatementsIT.java
@@ -31,6 +31,9 @@ import org.junit.Test;
import com.google.common.base.Optional;
import com.google.common.io.Files;
+import io.fluo.api.client.FluoAdmin;
+import io.fluo.api.client.FluoAdmin.AlreadyInitializedException;
+import io.fluo.api.client.FluoAdmin.TableExistsException;
import io.fluo.api.client.FluoFactory;
import io.fluo.api.config.FluoConfiguration;
import io.fluo.api.config.ObserverConfiguration;
@@ -48,24 +51,34 @@ public class CountStatementsIT extends ITBase {
* statements are inserted as part of the test will not be consumed.
*
* @return A Mini Fluo cluster.
+ * @throws TableExistsException
*/
@Override
- protected MiniFluo startMiniFluo() {
- final File miniDataDir = Files.createTempDir();
-
+ protected MiniFluo startMiniFluo() throws AlreadyInitializedException, TableExistsException {
// Setup the observers that will be used by the Fluo PCJ Application.
final List<ObserverConfiguration> observers = new ArrayList<>();
// Configure how the mini fluo will run.
final FluoConfiguration config = new FluoConfiguration();
- config.setApplicationName("IntegrationTests");
- config.setMiniDataDir(miniDataDir.getAbsolutePath());
+ config.setMiniStartAccumulo(false);
+ config.setAccumuloInstance(instanceName);
+ config.setAccumuloUser(ACCUMULO_USER);
+ config.setAccumuloPassword(ACCUMULO_PASSWORD);
+ config.setInstanceZookeepers(zookeepers + "/fluo");
+ config.setAccumuloZookeepers(zookeepers);
+
+ config.setApplicationName(appName);
+ config.setAccumuloTable("fluo" + appName);
+
config.addObservers(observers);
+ FluoFactory.newAdmin(config).initialize(
+ new FluoAdmin.InitOpts().setClearTable(true).setClearZookeeper(true) );
final MiniFluo miniFluo = FluoFactory.newMiniFluo(config);
return miniFluo;
}
+
@Test
public void test() {
// Insert some Triples into the Fluo app.
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/d9aaca79/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaExportIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaExportIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaExportIT.java
index 1ebc29b..873b78e 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaExportIT.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaExportIT.java
@@ -19,6 +19,8 @@
package org.apache.rya.indexing.pcj.fluo.integration;
import static org.junit.Assert.assertEquals;
+import io.fluo.api.client.Snapshot;
+import io.fluo.api.data.Bytes;
import java.util.HashMap;
import java.util.HashSet;
@@ -26,6 +28,8 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
+import mvm.rya.api.domain.RyaStatement;
+
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.TableNotFoundException;
@@ -54,10 +58,6 @@ import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
-import io.fluo.api.client.Snapshot;
-import io.fluo.api.data.Bytes;
-import mvm.rya.api.domain.RyaStatement;
-
/**
* Performs integration tests over the Fluo application geared towards Rya PCJ exporting.
* <p>
@@ -77,10 +77,10 @@ public class RyaExportIT extends ITBase {
final RyaExportParameters ryaParams = new RyaExportParameters(params);
ryaParams.setExportToRya(true);
- ryaParams.setAccumuloInstanceName(accumulo.getInstanceName());
- ryaParams.setZookeeperServers(accumulo.getZooKeepers());
- ryaParams.setExporterUsername("root");
- ryaParams.setExporterPassword("password");
+ ryaParams.setAccumuloInstanceName(instanceName);
+ ryaParams.setZookeeperServers(zookeepers);
+ ryaParams.setExporterUsername(ITBase.ACCUMULO_USER);
+ ryaParams.setExporterPassword(ITBase.ACCUMULO_PASSWORD);
return params;
}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/d9aaca79/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaInputIncrementalUpdateIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaInputIncrementalUpdateIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaInputIncrementalUpdateIT.java
new file mode 100644
index 0000000..68fd842
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaInputIncrementalUpdateIT.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.integration;
+
+import static org.junit.Assert.assertEquals;
+import io.fluo.api.client.FluoClient;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import mvm.rya.accumulo.AccumuloRyaDAO;
+import mvm.rya.indexing.external.PrecomputedJoinIndexer;
+
+import org.apache.rya.indexing.pcj.fluo.ITBase;
+import org.apache.rya.indexing.pcj.fluo.api.CreatePcj;
+import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
+import org.apache.rya.indexing.pcj.update.PrecomputedJoinUpdater;
+import org.junit.Test;
+import org.openrdf.model.Statement;
+import org.openrdf.model.impl.URIImpl;
+import org.openrdf.query.BindingSet;
+import org.openrdf.query.impl.BindingImpl;
+import org.openrdf.repository.RepositoryConnection;
+
+import com.google.common.collect.Sets;
+
+
+/**
+ * This test ensures that the correct updates are pushed by Fluo
+ * to the external PCJ table as triples are added to Rya through
+ * the {@link RepositoryConnection}. The key difference between these
+ * tests and those in {@link InputIT} is that streaming triples are added through
+ * the RepositoryConnection and not through the {@link FluoClient}. These tests are
+ * designed to verify that the {@link AccumuloRyaDAO} has been integrated
+ * with the {@link PrecomputedJoinIndexer} and that the associated {@link PrecomputedJoinUpdater} updates
+ * Fluo accordingly.
+ *
+ */
+
+public class RyaInputIncrementalUpdateIT extends ITBase {
+
+ /**
+ * Ensure historic matches are included in the result.
+ */
+ @Test
+ public void streamResultsThroughRya() throws Exception {
+
+ // A query that finds people who talk to Eve and work at Chipotle.
+ final String sparql = "SELECT ?x " + "WHERE { " + "?x <http://talksTo> <http://Eve>. "
+ + "?x <http://worksAt> <http://Chipotle>." + "}";
+
+ // Triples that are loaded into Rya before the PCJ is created.
+ final Set<Statement> historicTriples = Sets.newHashSet(
+ makeStatement("http://Alice", "http://talksTo", "http://Eve"),
+ makeStatement("http://Bob", "http://talksTo", "http://Eve"),
+ makeStatement("http://Charlie", "http://talksTo", "http://Eve"),
+
+ makeStatement("http://Eve", "http://helps", "http://Kevin"),
+
+ makeStatement("http://Bob", "http://worksAt", "http://Chipotle"),
+ makeStatement("http://Charlie", "http://worksAt", "http://Chipotle"),
+ makeStatement("http://Eve", "http://worksAt", "http://Chipotle"),
+ makeStatement("http://David", "http://worksAt", "http://Chipotle"));
+
+ // The expected results of the SPARQL query once the PCJ has been
+ // computed.
+ final Set<BindingSet> expected = new HashSet<>();
+ expected.add(makeBindingSet(new BindingImpl("x", new URIImpl("http://Bob"))));
+ expected.add(makeBindingSet(new BindingImpl("x", new URIImpl("http://Charlie"))));
+
+ // Create the PCJ in Fluo.
+ new CreatePcj().withRyaIntegration(fluoClient, RYA_TABLE_PREFIX, ryaRepo, accumuloConn,
+ new HashSet<VariableOrder>(), sparql);
+
+ // Verify the end results of the query match the expected results.
+ fluo.waitForObservers();
+
+ // Load the historic data into Rya.
+ for (final Statement triple : historicTriples) {
+ ryaConn.add(triple);
+ }
+
+ fluo.waitForObservers();
+
+ final Set<BindingSet> results = getQueryBindingSetValues(fluoClient, sparql);
+ assertEquals(expected, results);
+ }
+
+ /**
+ * Simulates the case where a Triple is added to Rya, a new query that
+ * includes that triple as a historic match is inserted into Fluo, and then
+ * some new triple that matches the query is streamed into Fluo. The query's
+ * results must include both the historic result and the newly streamed
+ * result.
+ */
+ @Test
+ public void historicThenStreamedResults() throws Exception {
+ // A query that finds people who talk to Eve and work at Chipotle.
+ final String sparql = "SELECT ?x " + "WHERE { " + "?x <http://talksTo> <http://Eve>. "
+ + "?x <http://worksAt> <http://Chipotle>." + "}";
+
+ // Triples that are loaded into Rya before the PCJ is created.
+ final Set<Statement> historicTriples = Sets.newHashSet(
+ makeStatement("http://Alice", "http://talksTo", "http://Eve"),
+ makeStatement("http://Alice", "http://worksAt", "http://Chipotle"),
+ makeStatement("http://Joe", "http://worksAt", "http://Chipotle"));
+
+ // Triples that will be streamed into Fluo after the PCJ has been
+ final Set<Statement> streamedTriples = Sets.newHashSet(
+ makeStatement("http://Frank", "http://talksTo", "http://Eve"),
+ makeStatement("http://Joe", "http://talksTo", "http://Eve"),
+ makeStatement("http://Frank", "http://worksAt", "http://Chipotle"));
+
+ // Load the historic data into Rya.
+ for (final Statement triple : historicTriples) {
+ ryaConn.add(triple);
+ }
+
+ // Create the PCJ in Fluo.
+ new CreatePcj().withRyaIntegration(fluoClient, RYA_TABLE_PREFIX, ryaRepo, accumuloConn,
+ new HashSet<VariableOrder>(), sparql);
+ fluo.waitForObservers();
+
+ // Load the streaming data into Rya.
+ for (final Statement triple : streamedTriples) {
+ ryaConn.add(triple);
+ }
+
+ // Ensure Alice is a match.
+ fluo.waitForObservers();
+ final Set<BindingSet> expected = new HashSet<>();
+ expected.add(makeBindingSet(new BindingImpl("x", new URIImpl("http://Alice"))));
+ expected.add(makeBindingSet(new BindingImpl("x", new URIImpl("http://Frank"))));
+ expected.add(makeBindingSet(new BindingImpl("x", new URIImpl("http://Joe"))));
+
+ Set<BindingSet> results = getQueryBindingSetValues(fluoClient, sparql);
+ assertEquals(expected, results);
+ }
+
+ @Test
+ public void historicAndStreamMultiVariables() throws Exception {
+ // A query that finds people who talk to Eve and work at Chipotle.
+ // A query that finds people who talk to Eve and work at Chipotle.
+ final String sparql = "SELECT ?x ?y " + "WHERE { " + "?x <http://talksTo> ?y. "
+ + "?x <http://worksAt> <http://Chipotle>." + "}";
+
+ // Triples that are loaded into Rya before the PCJ is created.
+ final Set<Statement> historicTriples = Sets.newHashSet(
+ makeStatement("http://Alice", "http://talksTo", "http://Eve"),
+ makeStatement("http://Alice", "http://worksAt", "http://Chipotle"),
+ makeStatement("http://Joe", "http://worksAt", "http://Chipotle"));
+
+ // Triples that will be streamed into Fluo after the PCJ has been
+ final Set<Statement> streamedTriples = Sets.newHashSet(
+ makeStatement("http://Frank", "http://talksTo", "http://Betty"),
+ makeStatement("http://Joe", "http://talksTo", "http://Alice"),
+ makeStatement("http://Frank", "http://worksAt", "http://Chipotle"));
+
+ // Load the historic data into Rya.
+ for (final Statement triple : historicTriples) {
+ ryaConn.add(triple);
+ }
+
+ // Create the PCJ in Fluo.
+ new CreatePcj().withRyaIntegration(fluoClient, RYA_TABLE_PREFIX, ryaRepo, accumuloConn,
+ new HashSet<VariableOrder>(), sparql);
+ fluo.waitForObservers();
+
+ // Load the streaming data into Rya.
+ for (final Statement triple : streamedTriples) {
+ ryaConn.add(triple);
+ }
+
+ // Ensure Alice is a match.
+ fluo.waitForObservers();
+ final Set<BindingSet> expected = new HashSet<>();
+ expected.add(makeBindingSet(new BindingImpl("x", new URIImpl("http://Alice")), new BindingImpl("y", new URIImpl("http://Eve"))));
+ expected.add(makeBindingSet(new BindingImpl("x", new URIImpl("http://Frank")), new BindingImpl("y", new URIImpl("http://Betty"))));
+ expected.add(makeBindingSet(new BindingImpl("x", new URIImpl("http://Joe")), new BindingImpl("y", new URIImpl("http://Alice"))));
+
+ Set<BindingSet> results = getQueryBindingSetValues(fluoClient, sparql);
+ assertEquals(expected, results);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/d9aaca79/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/visibility/PcjVisibilityIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/visibility/PcjVisibilityIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/visibility/PcjVisibilityIT.java
index 2e00e6b..95228b8 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/visibility/PcjVisibilityIT.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/visibility/PcjVisibilityIT.java
@@ -20,6 +20,8 @@ package org.apache.rya.indexing.pcj.fluo.visibility;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
+import io.fluo.api.client.Snapshot;
+import io.fluo.api.data.Bytes;
import java.util.HashMap;
import java.util.HashSet;
@@ -27,6 +29,9 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
+import mvm.rya.api.domain.RyaStatement;
+import mvm.rya.api.resolver.RyaTypeResolverException;
+
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.Connector;
@@ -68,11 +73,6 @@ import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
-import io.fluo.api.client.Snapshot;
-import io.fluo.api.data.Bytes;
-import mvm.rya.api.domain.RyaStatement;
-import mvm.rya.api.resolver.RyaTypeResolverException;
-
public class PcjVisibilityIT extends ITBase {
private static final AccumuloPcjSerializer converter = new AccumuloPcjSerializer();
@@ -86,10 +86,10 @@ public class PcjVisibilityIT extends ITBase {
final RyaExportParameters ryaParams = new RyaExportParameters(params);
ryaParams.setExportToRya(true);
- ryaParams.setAccumuloInstanceName(accumulo.getInstanceName());
- ryaParams.setZookeeperServers(accumulo.getZooKeepers());
- ryaParams.setExporterUsername("root");
- ryaParams.setExporterPassword("password");
+ ryaParams.setAccumuloInstanceName(instanceName);
+ ryaParams.setZookeeperServers(zookeepers);
+ ryaParams.setExporterUsername(ITBase.ACCUMULO_USER);
+ ryaParams.setExporterPassword(ITBase.ACCUMULO_PASSWORD);
return params;
}