You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rya.apache.org by mi...@apache.org on 2016/10/13 19:05:18 UTC
[5/7] incubator-rya git commit: RYA-142 Fixed Integration Tests for
Fluo Update
RYA-142 Fixed Integration Tests for Fluo Update
Closes #104, closes #100
Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/2139edb5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/2139edb5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/2139edb5
Branch: refs/heads/master
Commit: 2139edb5244c42f11c9440eced2fd0773846f856
Parents: 177c80a
Author: Caleb Meier <ca...@parsons.com>
Authored: Wed Oct 12 12:55:27 2016 -0700
Committer: Aaron Mihalik <mi...@alum.mit.edu>
Committed: Thu Oct 13 12:57:09 2016 -0400
----------------------------------------------------------------------
.../mvm/rya/api/client/accumulo/FluoITBase.java | 33 +++++-----
.../apache/rya/indexing/pcj/fluo/ITBase.java | 64 +++++++++-----------
.../pcj/fluo/api/CountStatementsIT.java | 6 +-
.../pcj/fluo/integration/CreateDeleteIT.java | 26 +++-----
.../RyaInputIncrementalUpdateIT.java | 3 +-
5 files changed, 61 insertions(+), 71 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2139edb5/extras/indexing/src/test/java/mvm/rya/api/client/accumulo/FluoITBase.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/test/java/mvm/rya/api/client/accumulo/FluoITBase.java b/extras/indexing/src/test/java/mvm/rya/api/client/accumulo/FluoITBase.java
index 0dcc04c..cc50b90 100644
--- a/extras/indexing/src/test/java/mvm/rya/api/client/accumulo/FluoITBase.java
+++ b/extras/indexing/src/test/java/mvm/rya/api/client/accumulo/FluoITBase.java
@@ -55,14 +55,14 @@ import org.openrdf.sail.SailException;
import com.google.common.io.Files;
-import io.fluo.api.client.FluoAdmin;
-import io.fluo.api.client.FluoAdmin.AlreadyInitializedException;
-import io.fluo.api.client.FluoAdmin.TableExistsException;
-import io.fluo.api.client.FluoClient;
-import io.fluo.api.client.FluoFactory;
-import io.fluo.api.config.FluoConfiguration;
-import io.fluo.api.config.ObserverConfiguration;
-import io.fluo.api.mini.MiniFluo;
+import org.apache.fluo.api.client.FluoAdmin;
+import org.apache.fluo.api.client.FluoAdmin.AlreadyInitializedException;
+import org.apache.fluo.api.client.FluoAdmin.TableExistsException;
+import org.apache.fluo.api.client.FluoClient;
+import org.apache.fluo.api.client.FluoFactory;
+import org.apache.fluo.api.config.FluoConfiguration;
+import org.apache.fluo.api.config.ObserverSpecification;
+import org.apache.fluo.api.mini.MiniFluo;
import mvm.rya.accumulo.AccumuloRdfConfiguration;
import mvm.rya.api.client.RyaClientException;
import mvm.rya.api.client.Install;
@@ -224,17 +224,16 @@ public abstract class FluoITBase {
*/
protected MiniFluo startMiniFluo() throws AlreadyInitializedException, TableExistsException {
// Setup the observers that will be used by the Fluo PCJ Application.
- final List<ObserverConfiguration> observers = new ArrayList<>();
- observers.add(new ObserverConfiguration(TripleObserver.class.getName()));
- observers.add(new ObserverConfiguration(StatementPatternObserver.class.getName()));
- observers.add(new ObserverConfiguration(JoinObserver.class.getName()));
- observers.add(new ObserverConfiguration(FilterObserver.class.getName()));
+ final List<ObserverSpecification> observers = new ArrayList<>();
+ observers.add(new ObserverSpecification(TripleObserver.class.getName()));
+ observers.add(new ObserverSpecification(StatementPatternObserver.class.getName()));
+ observers.add(new ObserverSpecification(JoinObserver.class.getName()));
+ observers.add(new ObserverSpecification(FilterObserver.class.getName()));
// Provide export parameters child test classes may provide to the
// export observer.
- final ObserverConfiguration exportObserverConfig = new ObserverConfiguration(
- QueryResultObserver.class.getName());
- exportObserverConfig.setParameters(makeExportParams());
+ final ObserverSpecification exportObserverConfig = new ObserverSpecification(
+ QueryResultObserver.class.getName(), makeExportParams());
observers.add(exportObserverConfig);
// Configure how the mini fluo will run.
@@ -252,7 +251,7 @@ public abstract class FluoITBase {
config.addObservers(observers);
FluoFactory.newAdmin(config).initialize(
- new FluoAdmin.InitOpts().setClearTable(true).setClearZookeeper(true) );
+ new FluoAdmin.InitializationOptions().setClearTable(true).setClearZookeeper(true) );
return FluoFactory.newMiniFluo(config);
}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2139edb5/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/ITBase.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/ITBase.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/ITBase.java
index 9e2b9c6..a5288ec 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/ITBase.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/ITBase.java
@@ -25,8 +25,8 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
-import java.util.Map.Entry;
import java.util.Set;
import org.apache.accumulo.core.client.AccumuloException;
@@ -37,6 +37,20 @@ import org.apache.accumulo.core.client.ZooKeeperInstance;
import org.apache.accumulo.core.client.security.tokens.PasswordToken;
import org.apache.accumulo.minicluster.MiniAccumuloCluster;
import org.apache.accumulo.minicluster.MiniAccumuloConfig;
+import org.apache.fluo.api.client.FluoAdmin;
+import org.apache.fluo.api.client.FluoAdmin.AlreadyInitializedException;
+import org.apache.fluo.api.client.FluoAdmin.TableExistsException;
+import org.apache.fluo.api.client.FluoClient;
+import org.apache.fluo.api.client.FluoFactory;
+import org.apache.fluo.api.client.Snapshot;
+import org.apache.fluo.api.client.scanner.CellScanner;
+import org.apache.fluo.api.client.scanner.ColumnScanner;
+import org.apache.fluo.api.client.scanner.RowScanner;
+import org.apache.fluo.api.config.FluoConfiguration;
+import org.apache.fluo.api.config.ObserverSpecification;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.RowColumnValue;
+import org.apache.fluo.api.mini.MiniFluo;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.rya.indexing.pcj.fluo.app.export.rya.RyaExportParameters;
@@ -63,19 +77,7 @@ import org.openrdf.repository.RepositoryConnection;
import org.openrdf.sail.Sail;
import com.google.common.io.Files;
-import org.apache.fluo.api.client.FluoAdmin;
-import org.apache.fluo.api.client.FluoAdmin.AlreadyInitializedException;
-import org.apache.fluo.api.client.FluoAdmin.TableExistsException;
-import org.apache.fluo.api.client.FluoClient;
-import org.apache.fluo.api.client.FluoFactory;
-import org.apache.fluo.api.client.Snapshot;
-import org.apache.fluo.api.config.FluoConfiguration;
-import org.apache.fluo.api.config.ObserverConfiguration;
-import org.apache.fluo.api.config.ScannerConfiguration;
-import org.apache.fluo.api.data.Bytes;
-import org.apache.fluo.api.iterator.ColumnIterator;
-import org.apache.fluo.api.iterator.RowIterator;
-import org.apache.fluo.api.mini.MiniFluo;
+
import mvm.rya.accumulo.AccumuloRdfConfiguration;
import mvm.rya.api.client.Install.InstallConfiguration;
import mvm.rya.api.client.RyaClient;
@@ -283,17 +285,13 @@ public abstract class ITBase {
final QueryMetadata queryMetadata = new FluoQueryMetadataDAO().readQueryMetadata(snapshot, queryId);
final VariableOrder varOrder = queryMetadata.getVariableOrder();
- // Fetch the Binding Sets for the query.
- final ScannerConfiguration scanConfig = new ScannerConfiguration();
- scanConfig.fetchColumn(FluoQueryColumns.QUERY_BINDING_SET.getFamily(),
- FluoQueryColumns.QUERY_BINDING_SET.getQualifier());
-
+ CellScanner cellScanner = snapshot.scanner().fetch(FluoQueryColumns.QUERY_BINDING_SET).build();
final BindingSetStringConverter converter = new BindingSetStringConverter();
- final RowIterator rowIter = snapshot.get(scanConfig);
- while (rowIter.hasNext()) {
- final Entry<Bytes, ColumnIterator> row = rowIter.next();
- final String bindingSetString = row.getValue().next().getValue().toString();
+ Iterator<RowColumnValue> iter = cellScanner.iterator();
+
+ while (iter.hasNext()) {
+ final String bindingSetString = iter.next().getsValue();
final BindingSet bindingSet = converter.convert(bindingSetString, varOrder);
bindingSets.add(bindingSet);
}
@@ -378,14 +376,11 @@ public abstract class ITBase {
*/
protected MiniFluo startMiniFluo() throws AlreadyInitializedException, TableExistsException {
// Setup the observers that will be used by the Fluo PCJ Application.
- final List<ObserverConfiguration> observers = new ArrayList<>();
- observers.add(new ObserverConfiguration(TripleObserver.class.getName()));
- observers.add(new ObserverConfiguration(StatementPatternObserver.class.getName()));
- observers.add(new ObserverConfiguration(JoinObserver.class.getName()));
- observers.add(new ObserverConfiguration(FilterObserver.class.getName()));
-
- // Configure the export observer to export new PCJ results to the mini accumulo cluster.
- final ObserverConfiguration exportObserverConfig = new ObserverConfiguration(QueryResultObserver.class.getName());
+ final List<ObserverSpecification> observers = new ArrayList<>();
+ observers.add(new ObserverSpecification(TripleObserver.class.getName()));
+ observers.add(new ObserverSpecification(StatementPatternObserver.class.getName()));
+ observers.add(new ObserverSpecification(JoinObserver.class.getName()));
+ observers.add(new ObserverSpecification(FilterObserver.class.getName()));
final HashMap<String, String> exportParams = new HashMap<>();
final RyaExportParameters ryaParams = new RyaExportParameters(exportParams);
@@ -395,8 +390,9 @@ public abstract class ITBase {
ryaParams.setZookeeperServers(zookeepers);
ryaParams.setExporterUsername(ITBase.ACCUMULO_USER);
ryaParams.setExporterPassword(ITBase.ACCUMULO_PASSWORD);
-
- exportObserverConfig.setParameters(exportParams);
+
+ // Configure the export observer to export new PCJ results to the mini accumulo cluster.
+ final ObserverSpecification exportObserverConfig = new ObserverSpecification(QueryResultObserver.class.getName(), exportParams);
observers.add(exportObserverConfig);
// Configure how the mini fluo will run.
@@ -414,7 +410,7 @@ public abstract class ITBase {
config.addObservers(observers);
FluoFactory.newAdmin(config).initialize(
- new FluoAdmin.InitOpts().setClearTable(true).setClearZookeeper(true) );
+ new FluoAdmin.InitializationOptions().setClearTable(true).setClearZookeeper(true) );
return FluoFactory.newMiniFluo(config);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2139edb5/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/CountStatementsIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/CountStatementsIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/CountStatementsIT.java
index 630b86d..f7d4ee1 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/CountStatementsIT.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/CountStatementsIT.java
@@ -34,7 +34,7 @@ import org.apache.fluo.api.client.FluoAdmin.AlreadyInitializedException;
import org.apache.fluo.api.client.FluoAdmin.TableExistsException;
import org.apache.fluo.api.client.FluoFactory;
import org.apache.fluo.api.config.FluoConfiguration;
-import org.apache.fluo.api.config.ObserverConfiguration;
+import org.apache.fluo.api.config.ObserverSpecification;
import org.apache.fluo.api.mini.MiniFluo;
import mvm.rya.api.domain.RyaStatement;
import mvm.rya.api.domain.RyaURI;
@@ -53,7 +53,7 @@ public class CountStatementsIT extends ITBase {
@Override
protected MiniFluo startMiniFluo() throws AlreadyInitializedException, TableExistsException {
// Setup the observers that will be used by the Fluo PCJ Application.
- final List<ObserverConfiguration> observers = new ArrayList<>();
+ final List<ObserverSpecification> observers = new ArrayList<>();
// Configure how the mini fluo will run.
final FluoConfiguration config = new FluoConfiguration();
@@ -70,7 +70,7 @@ public class CountStatementsIT extends ITBase {
config.addObservers(observers);
FluoFactory.newAdmin(config).initialize(
- new FluoAdmin.InitOpts().setClearTable(true).setClearZookeeper(true) );
+ new FluoAdmin.InitializationOptions().setClearTable(true).setClearZookeeper(true) );
final MiniFluo miniFluo = FluoFactory.newMiniFluo(config);
return miniFluo;
}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2139edb5/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeleteIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeleteIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeleteIT.java
index 0a1852b..d2ff98c 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeleteIT.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeleteIT.java
@@ -23,9 +23,14 @@ import static org.junit.Assert.assertEquals;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
-import java.util.Map.Entry;
import java.util.Set;
+import org.apache.fluo.api.client.FluoClient;
+import org.apache.fluo.api.client.Snapshot;
+import org.apache.fluo.api.client.scanner.ColumnScanner;
+import org.apache.fluo.api.client.scanner.RowScanner;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.Span;
import org.apache.rya.indexing.pcj.fluo.ITBase;
import org.apache.rya.indexing.pcj.fluo.api.CreatePcj;
import org.apache.rya.indexing.pcj.fluo.api.DeletePcj;
@@ -39,14 +44,6 @@ import org.openrdf.query.impl.BindingImpl;
import com.google.common.collect.Sets;
-import io.fluo.api.client.FluoClient;
-import io.fluo.api.client.Snapshot;
-import io.fluo.api.config.ScannerConfiguration;
-import io.fluo.api.data.Bytes;
-import io.fluo.api.data.Span;
-import io.fluo.api.iterator.ColumnIterator;
-import io.fluo.api.iterator.RowIterator;
-
public class CreateDeleteIT extends ITBase {
/**
@@ -109,15 +106,12 @@ public class CreateDeleteIT extends ITBase {
private List<Bytes> getFluoTableEntries(FluoClient fluoClient) {
try (Snapshot snapshot = fluoClient.newSnapshot()) {
List<Bytes> rows = new ArrayList<>();
+ RowScanner rscanner = snapshot.scanner().over(Span.prefix("")).byRow().build();
- ScannerConfiguration sc1 = new ScannerConfiguration();
- sc1.setSpan(Span.prefix(""));
- RowIterator iterator = snapshot.get(sc1);
-
- while (iterator.hasNext()) {
- Entry<Bytes, ColumnIterator> row = iterator.next();
- rows.add(row.getKey());
+ for(ColumnScanner cscanner: rscanner) {
+ rows.add(cscanner.getRow());
}
+
return rows;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2139edb5/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaInputIncrementalUpdateIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaInputIncrementalUpdateIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaInputIncrementalUpdateIT.java
index bc83eed..82f568e 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaInputIncrementalUpdateIT.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaInputIncrementalUpdateIT.java
@@ -25,6 +25,7 @@ import java.util.Set;
import org.apache.rya.indexing.pcj.fluo.ITBase;
import org.apache.rya.indexing.pcj.fluo.api.CreatePcj;
+import org.apache.rya.indexing.pcj.fluo.app.IncUpdateDAO;
import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage;
import org.apache.rya.indexing.pcj.update.PrecomputedJoinUpdater;
@@ -101,7 +102,7 @@ public class RyaInputIncrementalUpdateIT extends ITBase {
}
fluo.waitForObservers();
-
+
final Set<BindingSet> results = getQueryBindingSetValues(fluoClient, sparql);
assertEquals(expected, results);
}