You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rya.apache.org by mi...@apache.org on 2016/10/13 19:05:18 UTC

[5/7] incubator-rya git commit: RYA-142 Fixed Integration Tests for Fluo Update

RYA-142 Fixed Integration Tests for Fluo Update

Closes #104, closes #100


Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/2139edb5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/2139edb5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/2139edb5

Branch: refs/heads/master
Commit: 2139edb5244c42f11c9440eced2fd0773846f856
Parents: 177c80a
Author: Caleb Meier <ca...@parsons.com>
Authored: Wed Oct 12 12:55:27 2016 -0700
Committer: Aaron Mihalik <mi...@alum.mit.edu>
Committed: Thu Oct 13 12:57:09 2016 -0400

----------------------------------------------------------------------
 .../mvm/rya/api/client/accumulo/FluoITBase.java | 33 +++++-----
 .../apache/rya/indexing/pcj/fluo/ITBase.java    | 64 +++++++++-----------
 .../pcj/fluo/api/CountStatementsIT.java         |  6 +-
 .../pcj/fluo/integration/CreateDeleteIT.java    | 26 +++-----
 .../RyaInputIncrementalUpdateIT.java            |  3 +-
 5 files changed, 61 insertions(+), 71 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2139edb5/extras/indexing/src/test/java/mvm/rya/api/client/accumulo/FluoITBase.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/test/java/mvm/rya/api/client/accumulo/FluoITBase.java b/extras/indexing/src/test/java/mvm/rya/api/client/accumulo/FluoITBase.java
index 0dcc04c..cc50b90 100644
--- a/extras/indexing/src/test/java/mvm/rya/api/client/accumulo/FluoITBase.java
+++ b/extras/indexing/src/test/java/mvm/rya/api/client/accumulo/FluoITBase.java
@@ -55,14 +55,14 @@ import org.openrdf.sail.SailException;
 
 import com.google.common.io.Files;
 
-import io.fluo.api.client.FluoAdmin;
-import io.fluo.api.client.FluoAdmin.AlreadyInitializedException;
-import io.fluo.api.client.FluoAdmin.TableExistsException;
-import io.fluo.api.client.FluoClient;
-import io.fluo.api.client.FluoFactory;
-import io.fluo.api.config.FluoConfiguration;
-import io.fluo.api.config.ObserverConfiguration;
-import io.fluo.api.mini.MiniFluo;
+import org.apache.fluo.api.client.FluoAdmin;
+import org.apache.fluo.api.client.FluoAdmin.AlreadyInitializedException;
+import org.apache.fluo.api.client.FluoAdmin.TableExistsException;
+import org.apache.fluo.api.client.FluoClient;
+import org.apache.fluo.api.client.FluoFactory;
+import org.apache.fluo.api.config.FluoConfiguration;
+import org.apache.fluo.api.config.ObserverSpecification;
+import org.apache.fluo.api.mini.MiniFluo;
 import mvm.rya.accumulo.AccumuloRdfConfiguration;
 import mvm.rya.api.client.RyaClientException;
 import mvm.rya.api.client.Install;
@@ -224,17 +224,16 @@ public abstract class FluoITBase {
      */
     protected MiniFluo startMiniFluo() throws AlreadyInitializedException, TableExistsException {
         // Setup the observers that will be used by the Fluo PCJ Application.
-        final List<ObserverConfiguration> observers = new ArrayList<>();
-        observers.add(new ObserverConfiguration(TripleObserver.class.getName()));
-        observers.add(new ObserverConfiguration(StatementPatternObserver.class.getName()));
-        observers.add(new ObserverConfiguration(JoinObserver.class.getName()));
-        observers.add(new ObserverConfiguration(FilterObserver.class.getName()));
+        final List<ObserverSpecification> observers = new ArrayList<>();
+        observers.add(new ObserverSpecification(TripleObserver.class.getName()));
+        observers.add(new ObserverSpecification(StatementPatternObserver.class.getName()));
+        observers.add(new ObserverSpecification(JoinObserver.class.getName()));
+        observers.add(new ObserverSpecification(FilterObserver.class.getName()));
 
         // Provide export parameters child test classes may provide to the
         // export observer.
-        final ObserverConfiguration exportObserverConfig = new ObserverConfiguration(
-                QueryResultObserver.class.getName());
-        exportObserverConfig.setParameters(makeExportParams());
+        final ObserverSpecification exportObserverConfig = new ObserverSpecification(
+                QueryResultObserver.class.getName(), makeExportParams());
         observers.add(exportObserverConfig);
 
         // Configure how the mini fluo will run.
@@ -252,7 +251,7 @@ public abstract class FluoITBase {
         config.addObservers(observers);
 
         FluoFactory.newAdmin(config).initialize(
-                new FluoAdmin.InitOpts().setClearTable(true).setClearZookeeper(true) );
+                new FluoAdmin.InitializationOptions().setClearTable(true).setClearZookeeper(true) );
         return FluoFactory.newMiniFluo(config);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2139edb5/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/ITBase.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/ITBase.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/ITBase.java
index 9e2b9c6..a5288ec 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/ITBase.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/ITBase.java
@@ -25,8 +25,8 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
-import java.util.Map.Entry;
 import java.util.Set;
 
 import org.apache.accumulo.core.client.AccumuloException;
@@ -37,6 +37,20 @@ import org.apache.accumulo.core.client.ZooKeeperInstance;
 import org.apache.accumulo.core.client.security.tokens.PasswordToken;
 import org.apache.accumulo.minicluster.MiniAccumuloCluster;
 import org.apache.accumulo.minicluster.MiniAccumuloConfig;
+import org.apache.fluo.api.client.FluoAdmin;
+import org.apache.fluo.api.client.FluoAdmin.AlreadyInitializedException;
+import org.apache.fluo.api.client.FluoAdmin.TableExistsException;
+import org.apache.fluo.api.client.FluoClient;
+import org.apache.fluo.api.client.FluoFactory;
+import org.apache.fluo.api.client.Snapshot;
+import org.apache.fluo.api.client.scanner.CellScanner;
+import org.apache.fluo.api.client.scanner.ColumnScanner;
+import org.apache.fluo.api.client.scanner.RowScanner;
+import org.apache.fluo.api.config.FluoConfiguration;
+import org.apache.fluo.api.config.ObserverSpecification;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.RowColumnValue;
+import org.apache.fluo.api.mini.MiniFluo;
 import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
 import org.apache.rya.indexing.pcj.fluo.app.export.rya.RyaExportParameters;
@@ -63,19 +77,7 @@ import org.openrdf.repository.RepositoryConnection;
 import org.openrdf.sail.Sail;
 
 import com.google.common.io.Files;
-import org.apache.fluo.api.client.FluoAdmin;
-import org.apache.fluo.api.client.FluoAdmin.AlreadyInitializedException;
-import org.apache.fluo.api.client.FluoAdmin.TableExistsException;
-import org.apache.fluo.api.client.FluoClient;
-import org.apache.fluo.api.client.FluoFactory;
-import org.apache.fluo.api.client.Snapshot;
-import org.apache.fluo.api.config.FluoConfiguration;
-import org.apache.fluo.api.config.ObserverConfiguration;
-import org.apache.fluo.api.config.ScannerConfiguration;
-import org.apache.fluo.api.data.Bytes;
-import org.apache.fluo.api.iterator.ColumnIterator;
-import org.apache.fluo.api.iterator.RowIterator;
-import org.apache.fluo.api.mini.MiniFluo;
+
 import mvm.rya.accumulo.AccumuloRdfConfiguration;
 import mvm.rya.api.client.Install.InstallConfiguration;
 import mvm.rya.api.client.RyaClient;
@@ -283,17 +285,13 @@ public abstract class ITBase {
             final QueryMetadata queryMetadata = new FluoQueryMetadataDAO().readQueryMetadata(snapshot, queryId);
             final VariableOrder varOrder = queryMetadata.getVariableOrder();
 
-            // Fetch the Binding Sets for the query.
-            final ScannerConfiguration scanConfig = new ScannerConfiguration();
-            scanConfig.fetchColumn(FluoQueryColumns.QUERY_BINDING_SET.getFamily(),
-                    FluoQueryColumns.QUERY_BINDING_SET.getQualifier());
-
+            CellScanner cellScanner = snapshot.scanner().fetch(FluoQueryColumns.QUERY_BINDING_SET).build();
             final BindingSetStringConverter converter = new BindingSetStringConverter();
 
-            final RowIterator rowIter = snapshot.get(scanConfig);
-            while (rowIter.hasNext()) {
-                final Entry<Bytes, ColumnIterator> row = rowIter.next();
-                final String bindingSetString = row.getValue().next().getValue().toString();
+           Iterator<RowColumnValue> iter = cellScanner.iterator();
+            
+            while (iter.hasNext()) {
+            	final String bindingSetString = iter.next().getsValue();
                 final BindingSet bindingSet = converter.convert(bindingSetString, varOrder);
                 bindingSets.add(bindingSet);
             }
@@ -378,14 +376,11 @@ public abstract class ITBase {
      */
     protected MiniFluo startMiniFluo() throws AlreadyInitializedException, TableExistsException {
         // Setup the observers that will be used by the Fluo PCJ Application.
-        final List<ObserverConfiguration> observers = new ArrayList<>();
-        observers.add(new ObserverConfiguration(TripleObserver.class.getName()));
-        observers.add(new ObserverConfiguration(StatementPatternObserver.class.getName()));
-        observers.add(new ObserverConfiguration(JoinObserver.class.getName()));
-        observers.add(new ObserverConfiguration(FilterObserver.class.getName()));
-
-        // Configure the export observer to export new PCJ results to the mini accumulo cluster.
-        final ObserverConfiguration exportObserverConfig = new ObserverConfiguration(QueryResultObserver.class.getName());
+        final List<ObserverSpecification> observers = new ArrayList<>();
+        observers.add(new ObserverSpecification(TripleObserver.class.getName()));
+        observers.add(new ObserverSpecification(StatementPatternObserver.class.getName()));
+        observers.add(new ObserverSpecification(JoinObserver.class.getName()));
+        observers.add(new ObserverSpecification(FilterObserver.class.getName()));
 
         final HashMap<String, String> exportParams = new HashMap<>();
         final RyaExportParameters ryaParams = new RyaExportParameters(exportParams);
@@ -395,8 +390,9 @@ public abstract class ITBase {
         ryaParams.setZookeeperServers(zookeepers);
         ryaParams.setExporterUsername(ITBase.ACCUMULO_USER);
         ryaParams.setExporterPassword(ITBase.ACCUMULO_PASSWORD);
-
-        exportObserverConfig.setParameters(exportParams);
+        
+        // Configure the export observer to export new PCJ results to the mini accumulo cluster.
+        final ObserverSpecification exportObserverConfig = new ObserverSpecification(QueryResultObserver.class.getName(), exportParams);
         observers.add(exportObserverConfig);
 
         // Configure how the mini fluo will run.
@@ -414,7 +410,7 @@ public abstract class ITBase {
         config.addObservers(observers);
 
         FluoFactory.newAdmin(config).initialize(
-        		new FluoAdmin.InitOpts().setClearTable(true).setClearZookeeper(true) );
+        		new FluoAdmin.InitializationOptions().setClearTable(true).setClearZookeeper(true) );
         return FluoFactory.newMiniFluo(config);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2139edb5/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/CountStatementsIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/CountStatementsIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/CountStatementsIT.java
index 630b86d..f7d4ee1 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/CountStatementsIT.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/CountStatementsIT.java
@@ -34,7 +34,7 @@ import org.apache.fluo.api.client.FluoAdmin.AlreadyInitializedException;
 import org.apache.fluo.api.client.FluoAdmin.TableExistsException;
 import org.apache.fluo.api.client.FluoFactory;
 import org.apache.fluo.api.config.FluoConfiguration;
-import org.apache.fluo.api.config.ObserverConfiguration;
+import org.apache.fluo.api.config.ObserverSpecification;
 import org.apache.fluo.api.mini.MiniFluo;
 import mvm.rya.api.domain.RyaStatement;
 import mvm.rya.api.domain.RyaURI;
@@ -53,7 +53,7 @@ public class CountStatementsIT extends ITBase {
     @Override
     protected MiniFluo startMiniFluo() throws AlreadyInitializedException, TableExistsException {
         // Setup the observers that will be used by the Fluo PCJ Application.
-        final List<ObserverConfiguration> observers = new ArrayList<>();
+        final List<ObserverSpecification> observers = new ArrayList<>();
 
         // Configure how the mini fluo will run.
         final FluoConfiguration config = new FluoConfiguration();
@@ -70,7 +70,7 @@ public class CountStatementsIT extends ITBase {
         config.addObservers(observers);
 
         FluoFactory.newAdmin(config).initialize(
-                new FluoAdmin.InitOpts().setClearTable(true).setClearZookeeper(true) );
+                new FluoAdmin.InitializationOptions().setClearTable(true).setClearZookeeper(true) );
         final MiniFluo miniFluo = FluoFactory.newMiniFluo(config);
         return miniFluo;
     }

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2139edb5/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeleteIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeleteIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeleteIT.java
index 0a1852b..d2ff98c 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeleteIT.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeleteIT.java
@@ -23,9 +23,14 @@ import static org.junit.Assert.assertEquals;
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
-import java.util.Map.Entry;
 import java.util.Set;
 
+import org.apache.fluo.api.client.FluoClient;
+import org.apache.fluo.api.client.Snapshot;
+import org.apache.fluo.api.client.scanner.ColumnScanner;
+import org.apache.fluo.api.client.scanner.RowScanner;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.Span;
 import org.apache.rya.indexing.pcj.fluo.ITBase;
 import org.apache.rya.indexing.pcj.fluo.api.CreatePcj;
 import org.apache.rya.indexing.pcj.fluo.api.DeletePcj;
@@ -39,14 +44,6 @@ import org.openrdf.query.impl.BindingImpl;
 
 import com.google.common.collect.Sets;
 
-import io.fluo.api.client.FluoClient;
-import io.fluo.api.client.Snapshot;
-import io.fluo.api.config.ScannerConfiguration;
-import io.fluo.api.data.Bytes;
-import io.fluo.api.data.Span;
-import io.fluo.api.iterator.ColumnIterator;
-import io.fluo.api.iterator.RowIterator;
-
 public class CreateDeleteIT extends ITBase {
 
     /**
@@ -109,15 +106,12 @@ public class CreateDeleteIT extends ITBase {
     private List<Bytes> getFluoTableEntries(FluoClient fluoClient) {
         try (Snapshot snapshot = fluoClient.newSnapshot()) {
             List<Bytes> rows = new ArrayList<>();
+            RowScanner rscanner = snapshot.scanner().over(Span.prefix("")).byRow().build();
 
-            ScannerConfiguration sc1 = new ScannerConfiguration();
-            sc1.setSpan(Span.prefix(""));
-            RowIterator iterator = snapshot.get(sc1);
-
-            while (iterator.hasNext()) {
-                Entry<Bytes, ColumnIterator> row = iterator.next();
-                rows.add(row.getKey());
+            for(ColumnScanner cscanner: rscanner) {
+            	rows.add(cscanner.getRow());
             }
+            
             return rows;
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2139edb5/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaInputIncrementalUpdateIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaInputIncrementalUpdateIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaInputIncrementalUpdateIT.java
index bc83eed..82f568e 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaInputIncrementalUpdateIT.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaInputIncrementalUpdateIT.java
@@ -25,6 +25,7 @@ import java.util.Set;
 
 import org.apache.rya.indexing.pcj.fluo.ITBase;
 import org.apache.rya.indexing.pcj.fluo.api.CreatePcj;
+import org.apache.rya.indexing.pcj.fluo.app.IncUpdateDAO;
 import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
 import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage;
 import org.apache.rya.indexing.pcj.update.PrecomputedJoinUpdater;
@@ -101,7 +102,7 @@ public class RyaInputIncrementalUpdateIT extends ITBase {
         }
 
         fluo.waitForObservers();
-
+        
         final Set<BindingSet> results = getQueryBindingSetValues(fluoClient, sparql);
         assertEquals(expected, results);
     }