You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rya.apache.org by ca...@apache.org on 2017/08/02 21:01:58 UTC
[5/9] incubator-rya git commit: RYA-280-Periodic Query Service.
Closes #177.
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/FilterFinderTest.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/FilterFinderTest.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/FilterFinderTest.java
deleted file mode 100644
index 8b38923..0000000
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/FilterFinderTest.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.rya.indexing.pcj.fluo.app;
-
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import java.util.Arrays;
-
-import org.junit.Test;
-import org.openrdf.model.impl.LiteralImpl;
-import org.openrdf.model.impl.URIImpl;
-import org.openrdf.model.vocabulary.XMLSchema;
-import org.openrdf.query.algebra.Compare;
-import org.openrdf.query.algebra.Compare.CompareOp;
-import org.openrdf.query.algebra.Filter;
-import org.openrdf.query.algebra.ValueConstant;
-import org.openrdf.query.algebra.ValueExpr;
-import org.openrdf.query.algebra.Var;
-
-import com.google.common.base.Optional;
-
-/**
- * Tests the methods of {@link FilterFinder}.
- */
-public class FilterFinderTest {
-
- @Test
- public void manyFilters() throws Exception {
- // The query that will be searched.
- final String sparql =
- "SELECT ?person ?age " +
- "{" +
- "FILTER(?age < 30) . " +
- "FILTER(?person = <http://Alice>)" +
- "?person <http://hasAge> ?age" +
- "}";
-
- // Create the expected result.
- final ValueExpr[] expected = new ValueExpr[2];
- expected[0] = new Compare(new Var("person"), new ValueConstant( new URIImpl("http://Alice") ));
- expected[1] = new Compare(new Var("age"), new ValueConstant( new LiteralImpl("30", XMLSchema.INTEGER) ), CompareOp.LT);
-
- // Run the test.
- final FilterFinder finder = new FilterFinder();
- final ValueExpr[] conditions = new ValueExpr[2];
- conditions[0] = finder.findFilter(sparql, 0).get().getCondition();
- conditions[1] = finder.findFilter(sparql, 1).get().getCondition();
- assertTrue( Arrays.equals(expected, conditions) );
- }
-
- @Test
- public void noFilterAtIndex() throws Exception {
- // The query that will be searched.
- final String sparql =
- "SELECT ?person ?age " +
- "{" +
- "FILTER(?age < 30) . " +
- "FILTER(?person = <http://Alice>)" +
- "?person <http://hasAge> ?age" +
- "}";
-
- // Run the test.
- final FilterFinder finder = new FilterFinder();
- final Optional<Filter> filter = finder.findFilter(sparql, 4);
- assertFalse( filter.isPresent() );
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/VisibilityBindingSetSerDeTest.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/VisibilityBindingSetSerDeTest.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/VisibilityBindingSetSerDeTest.java
deleted file mode 100644
index 99791ee..0000000
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/VisibilityBindingSetSerDeTest.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.rya.indexing.pcj.fluo.app;
-
-import static org.junit.Assert.assertEquals;
-
-import org.apache.fluo.api.data.Bytes;
-import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
-import org.junit.Test;
-import org.openrdf.model.ValueFactory;
-import org.openrdf.model.impl.ValueFactoryImpl;
-import org.openrdf.query.impl.MapBindingSet;
-
-/**
- * Tests the methods of {@link VisibilityBindingSetSerDe}.
- */
-public class VisibilityBindingSetSerDeTest {
-
- @Test
- public void rountTrip() throws Exception {
- final ValueFactory vf = new ValueFactoryImpl();
-
- final MapBindingSet bs = new MapBindingSet();
- bs.addBinding("name", vf.createLiteral("Alice"));
- bs.addBinding("age", vf.createLiteral(5));
- final VisibilityBindingSet original = new VisibilityBindingSet(bs, "u");
-
- final VisibilityBindingSetSerDe serde = new VisibilityBindingSetSerDe();
- final Bytes bytes = serde.serialize(original);
- final VisibilityBindingSet result = serde.deserialize(bytes);
-
- assertEquals(original, result);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/batch/serializer/BatchInformationSerializerTest.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/batch/serializer/BatchInformationSerializerTest.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/batch/serializer/BatchInformationSerializerTest.java
new file mode 100644
index 0000000..fe89325
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/batch/serializer/BatchInformationSerializerTest.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.batch.serializer;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.Arrays;
+import java.util.Optional;
+
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.Span;
+import org.apache.rya.indexing.pcj.fluo.app.JoinResultUpdater.Side;
+import org.apache.rya.indexing.pcj.fluo.app.batch.BatchInformation;
+import org.apache.rya.indexing.pcj.fluo.app.batch.BatchInformation.Task;
+import org.apache.rya.indexing.pcj.fluo.app.batch.JoinBatchInformation;
+import org.apache.rya.indexing.pcj.fluo.app.batch.SpanBatchDeleteInformation;
+import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
+import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata.JoinType;
+import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
+import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
+import org.junit.Test;
+import org.openrdf.model.impl.URIImpl;
+import org.openrdf.query.algebra.evaluation.QueryBindingSet;
+
+public class BatchInformationSerializerTest {
+
+ @Test
+ public void testSpanBatchInformationSerialization() {
+
+ SpanBatchDeleteInformation batch = SpanBatchDeleteInformation.builder().setBatchSize(1000)
+ .setColumn(FluoQueryColumns.PERIODIC_QUERY_BINDING_SET).setSpan(Span.prefix(Bytes.of("prefix"))).build();
+ System.out.println(batch);
+ byte[] batchBytes = BatchInformationSerializer.toBytes(batch);
+ Optional<BatchInformation> decodedBatch = BatchInformationSerializer.fromBytes(batchBytes);
+ System.out.println(decodedBatch);
+ assertEquals(batch, decodedBatch.get());
+ }
+
+ @Test
+ public void testJoinBatchInformationSerialization() {
+
+ QueryBindingSet bs = new QueryBindingSet();
+ bs.addBinding("a", new URIImpl("urn:123"));
+ bs.addBinding("b", new URIImpl("urn:456"));
+ VisibilityBindingSet vBis = new VisibilityBindingSet(bs, "FOUO");
+
+ JoinBatchInformation batch = JoinBatchInformation.builder().setBatchSize(1000).setTask(Task.Update)
+ .setColumn(FluoQueryColumns.PERIODIC_QUERY_BINDING_SET).setSpan(Span.prefix(Bytes.of("prefix346")))
+ .setJoinType(JoinType.LEFT_OUTER_JOIN).setSide(Side.RIGHT).setVarOrder(new VariableOrder(Arrays.asList("a", "b")))
+ .setBs(vBis).build();
+
+ byte[] batchBytes = BatchInformationSerializer.toBytes(batch);
+ Optional<BatchInformation> decodedBatch = BatchInformationSerializer.fromBytes(batchBytes);
+ assertEquals(batch, decodedBatch.get());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/PeriodicQueryUtilTest.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/PeriodicQueryUtilTest.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/PeriodicQueryUtilTest.java
new file mode 100644
index 0000000..c8ca6af
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/PeriodicQueryUtilTest.java
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.query;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants;
+import org.apache.rya.indexing.pcj.fluo.app.query.SparqlFluoQueryBuilder.NodeIds;
+import org.apache.rya.indexing.pcj.fluo.app.util.PeriodicQueryUtil;
+import org.apache.rya.indexing.pcj.fluo.app.util.PeriodicQueryUtil.PeriodicQueryNodeRelocator;
+import org.apache.rya.indexing.pcj.fluo.app.util.PeriodicQueryUtil.PeriodicQueryNodeVisitor;
+import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
+import org.junit.Assert;
+import org.junit.Test;
+import org.openrdf.model.ValueFactory;
+import org.openrdf.model.impl.ValueFactoryImpl;
+import org.openrdf.query.MalformedQueryException;
+import org.openrdf.query.algebra.Filter;
+import org.openrdf.query.algebra.FunctionCall;
+import org.openrdf.query.algebra.Join;
+import org.openrdf.query.algebra.Projection;
+import org.openrdf.query.algebra.QueryModelNode;
+import org.openrdf.query.algebra.TupleExpr;
+import org.openrdf.query.algebra.ValueConstant;
+import org.openrdf.query.algebra.ValueExpr;
+import org.openrdf.query.algebra.Var;
+import org.openrdf.query.algebra.helpers.QueryModelVisitorBase;
+import org.openrdf.query.parser.ParsedQuery;
+import org.openrdf.query.parser.sparql.SPARQLParser;
+
+public class PeriodicQueryUtilTest {
+
+ private static final ValueFactory vf = new ValueFactoryImpl();
+
+
+
+ @Test
+ public void periodicNodeNotPresentTest() throws Exception {
+
+ List<ValueExpr> values = Arrays.asList(new Var("time"), new ValueConstant(vf.createLiteral(12.0)), new ValueConstant(vf.createLiteral(6.0)), new ValueConstant(vf.createURI(PeriodicQueryUtil.temporalNameSpace + "hours")));
+ FunctionCall func = new FunctionCall("uri:func", values);
+ Optional<PeriodicQueryNode> node1 = PeriodicQueryUtil.getPeriodicQueryNode(func, new Join());
+ Assert.assertEquals(false, node1.isPresent());
+ }
+
+
+
+ @Test
+ public void periodicNodePresentTest() throws Exception {
+
+ List<ValueExpr> values = Arrays.asList(new Var("time"), new ValueConstant(vf.createLiteral(12.0)), new ValueConstant(vf.createLiteral(6.0)), new ValueConstant(vf.createURI(PeriodicQueryUtil.temporalNameSpace + "hours")));
+ FunctionCall func = new FunctionCall(PeriodicQueryUtil.PeriodicQueryURI, values);
+ Optional<PeriodicQueryNode> node1 = PeriodicQueryUtil.getPeriodicQueryNode(func, new Join());
+ Assert.assertEquals(true, node1.isPresent());
+
+ PeriodicQueryNode node2 = new PeriodicQueryNode(12*60*60*1000L, 6*3600*1000L, TimeUnit.MILLISECONDS, "time", new Join());
+
+ Assert.assertEquals(true, periodicNodesEqualIgnoreArg(node1.get(), node2));
+ }
+
+
+ @Test
+ public void periodicNodeFractionalDurationTest() throws Exception {
+
+ List<ValueExpr> values = Arrays.asList(new Var("time"), new ValueConstant(vf.createLiteral(1)), new ValueConstant(vf.createLiteral(.5)), new ValueConstant(vf.createURI(PeriodicQueryUtil.temporalNameSpace + "hours")));
+ FunctionCall func = new FunctionCall(PeriodicQueryUtil.PeriodicQueryURI, values);
+ Optional<PeriodicQueryNode> node1 = PeriodicQueryUtil.getPeriodicQueryNode(func, new Join());
+ Assert.assertEquals(true, node1.isPresent());
+
+ double window = 1*60*60*1000;
+ double period = .5*3600*1000;
+
+ PeriodicQueryNode node2 = new PeriodicQueryNode((long) window, (long) period, TimeUnit.MILLISECONDS, "time", new Join());
+
+ Assert.assertEquals(true, periodicNodesEqualIgnoreArg(node1.get(), node2));
+ }
+
+ @Test
+ public void testPeriodicNodePlacement() throws MalformedQueryException {
+ String query = "prefix function: <http://org.apache.rya/function#> " //n
+ + "prefix time: <http://www.w3.org/2006/time#> " //n
+ + "prefix fn: <http://www.w3.org/2006/fn#> " //n
+ + "select ?obs ?time ?lat where {" //n
+ + "Filter(function:periodic(?time, 12.0, 6.0,time:hours)) " //n
+ + "Filter(fn:test(?lat, 25)) " //n
+ + "?obs <uri:hasTime> ?time. " //n
+ + "?obs <uri:hasLattitude> ?lat }"; //n
+
+ SPARQLParser parser = new SPARQLParser();
+ ParsedQuery pq = parser.parseQuery(query, null);
+ TupleExpr te = pq.getTupleExpr();
+ te.visit(new PeriodicQueryNodeVisitor());
+
+ PeriodicNodeCollector collector = new PeriodicNodeCollector();
+ te.visit(collector);
+
+ PeriodicQueryNode node2 = new PeriodicQueryNode(12*60*60*1000L, 6*3600*1000L, TimeUnit.MILLISECONDS, "time", new Join());
+
+ Assert.assertEquals(true, periodicNodesEqualIgnoreArg(node2, collector.getPeriodicQueryNode()));
+
+ }
+
+ @Test
+ public void testPeriodicNodeLocation() throws MalformedQueryException {
+ String query = "prefix function: <http://org.apache.rya/function#> " //n
+ + "prefix time: <http://www.w3.org/2006/time#> " //n
+ + "prefix fn: <http://www.w3.org/2006/fn#> " //n
+ + "select ?obs ?time ?lat where {" //n
+ + "Filter(function:periodic(?time, 1,.5,time:hours)) " //n
+ + "Filter(fn:test(?lat, 25)) " //n
+ + "?obs <uri:hasTime> ?time. " //n
+ + "?obs <uri:hasLattitude> ?lat }"; //n
+
+ SPARQLParser parser = new SPARQLParser();
+ ParsedQuery pq = parser.parseQuery(query, null);
+ TupleExpr te = pq.getTupleExpr();
+ te.visit(new PeriodicQueryNodeVisitor());
+
+ PeriodicNodeCollector collector = new PeriodicNodeCollector();
+ te.visit(collector);
+ Assert.assertEquals(2, collector.getPos());
+
+ te.visit(new PeriodicQueryNodeRelocator());
+ collector.resetCount();
+ te.visit(collector);
+ Assert.assertEquals(1, collector.getPos());
+
+ double window = 1*60*60*1000;
+ double period = .5*3600*1000;
+ PeriodicQueryNode node2 = new PeriodicQueryNode((long) window, (long) period, TimeUnit.MILLISECONDS, "time", new Join());
+ Assert.assertEquals(true, periodicNodesEqualIgnoreArg(node2, collector.getPeriodicQueryNode()));
+
+ }
+
+ @Test
+ public void testFluoQueryVarOrders() throws MalformedQueryException {
+ String query = "prefix function: <http://org.apache.rya/function#> " //n
+ + "prefix time: <http://www.w3.org/2006/time#> " //n
+ + "select (count(?obs) as ?total) where {" //n
+ + "Filter(function:periodic(?time, 12.4, 6.2,time:hours)) " //n
+ + "?obs <uri:hasTime> ?time. " //n
+ + "?obs <uri:hasLattitude> ?lat }"; //n
+
+ SPARQLParser parser = new SPARQLParser();
+ ParsedQuery pq = parser.parseQuery(query, null);
+ SparqlFluoQueryBuilder builder = new SparqlFluoQueryBuilder();
+ FluoQuery fluoQuery = builder.make(pq, new NodeIds());
+
+ PeriodicQueryMetadata periodicMeta = fluoQuery.getPeriodicQueryMetadata().orNull();
+ Assert.assertEquals(true, periodicMeta != null);
+ VariableOrder periodicVars = periodicMeta.getVariableOrder();
+ Assert.assertEquals(IncrementalUpdateConstants.PERIODIC_BIN_ID, periodicVars.getVariableOrders().get(0));
+
+ QueryMetadata queryMeta = fluoQuery.getQueryMetadata().get();
+ VariableOrder queryVars = queryMeta.getVariableOrder();
+ Assert.assertEquals(IncrementalUpdateConstants.PERIODIC_BIN_ID, queryVars.getVariableOrders().get(0));
+
+ Collection<AggregationMetadata> aggMetaCollection = fluoQuery.getAggregationMetadata();
+ Assert.assertEquals(1, aggMetaCollection.size());
+ AggregationMetadata aggMeta = aggMetaCollection.iterator().next();
+ VariableOrder aggVars = aggMeta.getVariableOrder();
+ Assert.assertEquals(IncrementalUpdateConstants.PERIODIC_BIN_ID, aggVars.getVariableOrders().get(0));
+
+ System.out.println(fluoQuery);
+ }
+
+ private boolean periodicNodesEqualIgnoreArg(PeriodicQueryNode node1, PeriodicQueryNode node2) {
+ return new EqualsBuilder().append(node1.getPeriod(), node2.getPeriod()).append(node1.getWindowSize(), node2.getWindowSize())
+ .append(node1.getTemporalVariable(), node2.getTemporalVariable()).append(node1.getUnit(), node2.getUnit()).build();
+ }
+
+ private static class PeriodicNodeCollector extends QueryModelVisitorBase<RuntimeException>{
+
+ private PeriodicQueryNode periodicNode;
+ int count = 0;
+
+ public PeriodicQueryNode getPeriodicQueryNode() {
+ return periodicNode;
+ }
+
+ public int getPos() {
+ return count;
+ }
+
+ public void resetCount() {
+ count = 0;
+ }
+
+ public void meet(Filter node) {
+ count++;
+ node.getArg().visit(this);
+ }
+
+ public void meet(Projection node) {
+ count++;
+ node.getArg().visit(this);
+ }
+
+ @Override
+ public void meetOther(QueryModelNode node) {
+ if(node instanceof PeriodicQueryNode) {
+ periodicNode = (PeriodicQueryNode) node;
+ }
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/util/QueryReportRenderer.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/util/QueryReportRenderer.java b/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/util/QueryReportRenderer.java
index 99ccc58..7a73b41 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/util/QueryReportRenderer.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/util/QueryReportRenderer.java
@@ -83,8 +83,7 @@ public class QueryReportRenderer {
builder.appendItem( new ReportItem("FILTER NODE") );
builder.appendItem( new ReportItem("Node ID", filterMetadata.getNodeId()) );
builder.appendItem( new ReportItem("Variable Order", filterMetadata.getVariableOrder().toString()) );
- builder.appendItem( new ReportItem("Original SPARQL", prettyFormatSparql( filterMetadata.getOriginalSparql()) ) );
- builder.appendItem( new ReportItem("Filter Index", "" + filterMetadata.getFilterIndexWithinSparql()) );
+ builder.appendItem( new ReportItem("Filter SPARQL", prettyFormatSparql( filterMetadata.getFilterSparql())));
builder.appendItem( new ReportItem("Parent Node ID", filterMetadata.getParentNodeId()) );
builder.appendItem( new ReportItem("Child Node ID", filterMetadata.getChildNodeId()) );
builder.appendItem( new ReportItem("Count", "" + queryReport.getCount(filterMetadata.getNodeId())) );
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.integration/pom.xml
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/pom.xml b/extras/rya.pcj.fluo/pcj.fluo.integration/pom.xml
index 6467191..9591e55 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.integration/pom.xml
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/pom.xml
@@ -43,6 +43,11 @@
<artifactId>rya.indexing</artifactId>
</dependency>
<dependency>
+ <groupId>org.apache.rya</groupId>
+ <artifactId>rya.pcj.fluo.test.base</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
<groupId>org.apache.fluo</groupId>
<artifactId>fluo-api</artifactId>
</dependency>
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/FluoITBase.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/FluoITBase.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/FluoITBase.java
deleted file mode 100644
index b5d9428..0000000
--- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/FluoITBase.java
+++ /dev/null
@@ -1,282 +0,0 @@
-package org.apache.rya.indexing.pcj.fluo;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import java.net.UnknownHostException;
-
-import org.apache.accumulo.core.client.AccumuloException;
-import org.apache.accumulo.core.client.AccumuloSecurityException;
-import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.Instance;
-import org.apache.accumulo.core.client.ZooKeeperInstance;
-import org.apache.accumulo.core.client.security.tokens.PasswordToken;
-import org.apache.accumulo.minicluster.MiniAccumuloCluster;
-import org.apache.log4j.Level;
-import org.apache.log4j.Logger;
-import org.apache.rya.accumulo.MiniAccumuloClusterInstance;
-import org.apache.rya.accumulo.MiniAccumuloSingleton;
-import org.apache.rya.accumulo.RyaTestInstanceRule;
-import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails;
-import org.apache.rya.api.client.accumulo.AccumuloInstall;
-import org.apache.zookeeper.ClientCnxn;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.openrdf.repository.RepositoryConnection;
-import org.openrdf.repository.RepositoryException;
-import org.openrdf.sail.Sail;
-import org.openrdf.sail.SailException;
-
-import org.apache.fluo.api.client.FluoAdmin;
-import org.apache.fluo.api.client.FluoAdmin.AlreadyInitializedException;
-import org.apache.fluo.api.client.FluoClient;
-import org.apache.fluo.api.client.FluoFactory;
-import org.apache.fluo.api.config.FluoConfiguration;
-import org.apache.fluo.api.mini.MiniFluo;
-import org.apache.rya.accumulo.AccumuloRdfConfiguration;
-import org.apache.rya.api.client.RyaClientException;
-import org.apache.rya.api.client.Install;
-import org.apache.rya.api.client.Install.DuplicateInstanceNameException;
-import org.apache.rya.api.client.Install.InstallConfiguration;
-import org.apache.rya.api.instance.RyaDetailsRepository.RyaDetailsRepositoryException;
-import org.apache.rya.api.persist.RyaDAOException;
-import org.apache.rya.indexing.accumulo.ConfigUtils;
-import org.apache.rya.indexing.external.PrecomputedJoinIndexerConfig;
-import org.apache.rya.rdftriplestore.RyaSailRepository;
-import org.apache.rya.rdftriplestore.inference.InferenceEngineException;
-import org.apache.rya.sail.config.RyaSailFactory;
-
-/**
- * Integration tests that ensure the Fluo application processes PCJs results
- * correctly.
- * <p>
- * This class is being ignored because it doesn't contain any unit tests.
- */
-public abstract class FluoITBase {
- private static final Logger log = Logger.getLogger(FluoITBase.class);
-
- // Mini Accumulo Cluster
- private static MiniAccumuloClusterInstance clusterInstance = MiniAccumuloSingleton.getInstance();
- private static MiniAccumuloCluster cluster;
-
- private static String instanceName = null;
- private static String zookeepers = null;
-
- protected static Connector accumuloConn = null;
-
- // Fluo data store and connections.
- protected MiniFluo fluo = null;
- protected FluoConfiguration fluoConfig = null;
- protected FluoClient fluoClient = null;
-
- // Rya data store and connections.
- protected RyaSailRepository ryaRepo = null;
- protected RepositoryConnection ryaConn = null;
-
- @Rule
- public RyaTestInstanceRule testInstance = new RyaTestInstanceRule(false);
-
- @BeforeClass
- public static void beforeClass() throws Exception {
- Logger.getLogger(ClientCnxn.class).setLevel(Level.ERROR);
-
- // Setup and start the Mini Accumulo.
- cluster = clusterInstance.getCluster();
-
- // Store a connector to the Mini Accumulo.
- instanceName = cluster.getInstanceName();
- zookeepers = cluster.getZooKeepers();
-
- final Instance instance = new ZooKeeperInstance(instanceName, zookeepers);
- accumuloConn = instance.getConnector(clusterInstance.getUsername(), new PasswordToken(clusterInstance.getPassword()));
- }
-
- @Before
- public void setupMiniResources() throws Exception {
- // Initialize the Mini Fluo that will be used to store created queries.
- fluoConfig = createFluoConfig();
- preFluoInitHook();
- FluoFactory.newAdmin(fluoConfig).initialize(new FluoAdmin.InitializationOptions()
- .setClearTable(true)
- .setClearZookeeper(true));
- postFluoInitHook();
- fluo = FluoFactory.newMiniFluo(fluoConfig);
- fluoClient = FluoFactory.newClient(fluo.getClientConfiguration());
-
- // Initialize the Rya that will be used by the tests.
- ryaRepo = setupRya();
- ryaConn = ryaRepo.getConnection();
- }
-
- @After
- public void shutdownMiniResources() {
- if (ryaConn != null) {
- try {
- log.info("Shutting down Rya Connection.");
- ryaConn.close();
- log.info("Rya Connection shut down.");
- } catch (final Exception e) {
- log.error("Could not shut down the Rya Connection.", e);
- }
- }
-
- if (ryaRepo != null) {
- try {
- log.info("Shutting down Rya Repo.");
- ryaRepo.shutDown();
- log.info("Rya Repo shut down.");
- } catch (final Exception e) {
- log.error("Could not shut down the Rya Repo.", e);
- }
- }
-
- if (fluoClient != null) {
- try {
- log.info("Shutting down Fluo Client.");
- fluoClient.close();
- log.info("Fluo Client shut down.");
- } catch (final Exception e) {
- log.error("Could not shut down the Fluo Client.", e);
- }
- }
-
- if (fluo != null) {
- try {
- log.info("Shutting down Mini Fluo.");
- fluo.close();
- log.info("Mini Fluo shut down.");
- } catch (final Exception e) {
- log.error("Could not shut down the Mini Fluo.", e);
- }
- }
- }
-
- protected void preFluoInitHook() throws Exception {
-
- }
-
- protected void postFluoInitHook() throws Exception {
-
- }
-
- protected MiniAccumuloCluster getMiniAccumuloCluster() {
- return cluster;
- }
-
- protected MiniFluo getMiniFluo() {
- return fluo;
- }
-
- public RyaSailRepository getRyaSailRepository() {
- return ryaRepo;
- }
-
- public Connector getAccumuloConnector() {
- return accumuloConn;
- }
-
- public String getRyaInstanceName() {
- return testInstance.getRyaInstanceName();
- }
-
- protected String getUsername() {
- return clusterInstance.getUsername();
- }
-
- protected String getPassword() {
- return clusterInstance.getPassword();
- }
-
- protected FluoConfiguration getFluoConfiguration() {
- return fluoConfig;
- }
-
- public AccumuloConnectionDetails createConnectionDetails() {
- return new AccumuloConnectionDetails(
- clusterInstance.getUsername(),
- clusterInstance.getPassword().toCharArray(),
- clusterInstance.getInstanceName(),
- clusterInstance.getZookeepers());
- }
-
- private FluoConfiguration createFluoConfig() {
- // Configure how the mini fluo will run.
- final FluoConfiguration config = new FluoConfiguration();
- config.setMiniStartAccumulo(false);
- config.setAccumuloInstance(instanceName);
- config.setAccumuloUser(clusterInstance.getUsername());
- config.setAccumuloPassword(clusterInstance.getPassword());
- config.setInstanceZookeepers(zookeepers + "/fluo");
- config.setAccumuloZookeepers(zookeepers);
-
- config.setApplicationName(getRyaInstanceName());
- config.setAccumuloTable("fluo" + getRyaInstanceName());
- return config;
- }
-
- /**
- * Sets up a Rya instance.
- */
- protected RyaSailRepository setupRya()
- throws AccumuloException, AccumuloSecurityException, RepositoryException, RyaDAOException,
- NumberFormatException, UnknownHostException, InferenceEngineException, AlreadyInitializedException,
- RyaDetailsRepositoryException, DuplicateInstanceNameException, RyaClientException, SailException {
- checkNotNull(instanceName);
- checkNotNull(zookeepers);
-
- // Setup Rya configuration values.
- final AccumuloRdfConfiguration conf = new AccumuloRdfConfiguration();
- conf.setTablePrefix(getRyaInstanceName());
- conf.setDisplayQueryPlan(true);
- conf.setBoolean(ConfigUtils.USE_MOCK_INSTANCE, false);
- conf.set(ConfigUtils.CLOUDBASE_USER, clusterInstance.getUsername());
- conf.set(ConfigUtils.CLOUDBASE_PASSWORD, clusterInstance.getPassword());
- conf.set(ConfigUtils.CLOUDBASE_INSTANCE, clusterInstance.getInstanceName());
- conf.set(ConfigUtils.CLOUDBASE_ZOOKEEPERS, clusterInstance.getZookeepers());
- conf.set(ConfigUtils.USE_PCJ, "true");
- conf.set(ConfigUtils.FLUO_APP_NAME, getRyaInstanceName());
- conf.set(ConfigUtils.PCJ_STORAGE_TYPE, PrecomputedJoinIndexerConfig.PrecomputedJoinStorageType.ACCUMULO.toString());
- conf.set(ConfigUtils.PCJ_UPDATER_TYPE, PrecomputedJoinIndexerConfig.PrecomputedJoinUpdaterType.FLUO.toString());
- conf.set(ConfigUtils.CLOUDBASE_AUTHS, "");
-
- // Install the test instance of Rya.
- final Install install = new AccumuloInstall(createConnectionDetails(), accumuloConn);
-
- final InstallConfiguration installConfig = InstallConfiguration.builder()
- .setEnableTableHashPrefix(true)
- .setEnableEntityCentricIndex(true)
- .setEnableFreeTextIndex(true)
- .setEnableTemporalIndex(true)
- .setEnablePcjIndex(true)
- .setEnableGeoIndex(true)
- .setFluoPcjAppName(getRyaInstanceName())
- .build();
- install.install(getRyaInstanceName(), installConfig);
-
- // Connect to the instance of Rya that was just installed.
- final Sail sail = RyaSailFactory.getInstance(conf);
- final RyaSailRepository ryaRepo = new RyaSailRepository(sail);
-
- return ryaRepo;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/KafkaExportITBase.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/KafkaExportITBase.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/KafkaExportITBase.java
deleted file mode 100644
index 452dd27..0000000
--- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/KafkaExportITBase.java
+++ /dev/null
@@ -1,370 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.rya.indexing.pcj.fluo;
-
-import static java.util.Objects.requireNonNull;
-import static org.junit.Assert.assertEquals;
-
-import java.nio.charset.StandardCharsets;
-import java.nio.file.Files;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Properties;
-
-import org.I0Itec.zkclient.ZkClient;
-import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.Instance;
-import org.apache.accumulo.minicluster.MiniAccumuloCluster;
-import org.apache.fluo.api.config.ObserverSpecification;
-import org.apache.fluo.recipes.test.AccumuloExportITBase;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.rya.accumulo.AccumuloRdfConfiguration;
-import org.apache.rya.accumulo.AccumuloRyaDAO;
-import org.apache.rya.api.client.Install.InstallConfiguration;
-import org.apache.rya.api.client.RyaClient;
-import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails;
-import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory;
-import org.apache.rya.indexing.accumulo.ConfigUtils;
-import org.apache.rya.indexing.external.PrecomputedJoinIndexerConfig;
-import org.apache.rya.indexing.pcj.fluo.app.export.kafka.KafkaExportParameters;
-import org.apache.rya.indexing.pcj.fluo.app.export.kafka.RyaSubGraphKafkaSerDe;
-import org.apache.rya.indexing.pcj.fluo.app.observers.AggregationObserver;
-import org.apache.rya.indexing.pcj.fluo.app.observers.ConstructQueryResultObserver;
-import org.apache.rya.indexing.pcj.fluo.app.observers.FilterObserver;
-import org.apache.rya.indexing.pcj.fluo.app.observers.JoinObserver;
-import org.apache.rya.indexing.pcj.fluo.app.observers.QueryResultObserver;
-import org.apache.rya.indexing.pcj.fluo.app.observers.StatementPatternObserver;
-import org.apache.rya.indexing.pcj.fluo.app.observers.TripleObserver;
-import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
-import org.apache.rya.rdftriplestore.RyaSailRepository;
-import org.apache.rya.sail.config.RyaSailFactory;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.openrdf.model.Statement;
-import org.openrdf.repository.sail.SailRepositoryConnection;
-import org.openrdf.sail.Sail;
-
-
-import kafka.admin.AdminUtils;
-import kafka.admin.RackAwareMode;
-import kafka.server.KafkaConfig;
-import kafka.server.KafkaServer;
-import kafka.utils.MockTime;
-import kafka.utils.TestUtils;
-import kafka.utils.Time;
-import kafka.utils.ZKStringSerializer$;
-import kafka.utils.ZkUtils;
-import kafka.zk.EmbeddedZookeeper;
-
-/**
- * The base Integration Test class used for Fluo applications that export to a
- * Kakfa topic.
- */
-public class KafkaExportITBase extends AccumuloExportITBase {
-
- protected static final String RYA_INSTANCE_NAME = "test_";
-
- private static final String ZKHOST = "127.0.0.1";
- private static final String BROKERHOST = "127.0.0.1";
- private static final String BROKERPORT = "9092";
- private ZkUtils zkUtils;
- private KafkaServer kafkaServer;
- private EmbeddedZookeeper zkServer;
- private ZkClient zkClient;
-
- // The Rya instance statements are written to that will be fed into the Fluo
- // app.
- private RyaSailRepository ryaSailRepo = null;
- private AccumuloRyaDAO dao = null;
-
- /**
- * Add info about the Kafka queue/topic to receive the export.
- */
- @Override
- protected void preFluoInitHook() throws Exception {
- // Setup the observers that will be used by the Fluo PCJ Application.
- final List<ObserverSpecification> observers = new ArrayList<>();
- observers.add(new ObserverSpecification(TripleObserver.class.getName()));
- observers.add(new ObserverSpecification(StatementPatternObserver.class.getName()));
- observers.add(new ObserverSpecification(JoinObserver.class.getName()));
- observers.add(new ObserverSpecification(FilterObserver.class.getName()));
- observers.add(new ObserverSpecification(AggregationObserver.class.getName()));
-
- // Configure the export observer to export new PCJ results to the mini
- // accumulo cluster.
- final HashMap<String, String> exportParams = new HashMap<>();
-
- final KafkaExportParameters kafkaParams = new KafkaExportParameters(exportParams);
- kafkaParams.setExportToKafka(true);
-
- // Configure the Kafka Producer
- final Properties producerConfig = new Properties();
- producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKERHOST + ":" + BROKERPORT);
- producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
- producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
- "org.apache.rya.indexing.pcj.fluo.app.export.kafka.KryoVisibilityBindingSetSerializer");
- kafkaParams.addAllProducerConfig(producerConfig);
-
- final ObserverSpecification exportObserverConfig = new ObserverSpecification(QueryResultObserver.class.getName(), exportParams);
- observers.add(exportObserverConfig);
-
- //create construct query observer and tell it not to export to Kafka
- //it will only add results back into Fluo
- HashMap<String, String> constructParams = new HashMap<>();
- final KafkaExportParameters kafkaConstructParams = new KafkaExportParameters(constructParams);
- kafkaConstructParams.setExportToKafka(true);
-
- // Configure the Kafka Producer
- final Properties constructProducerConfig = new Properties();
- constructProducerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKERHOST + ":" + BROKERPORT);
- constructProducerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
- constructProducerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, RyaSubGraphKafkaSerDe.class.getName());
- kafkaConstructParams.addAllProducerConfig(constructProducerConfig);
-
- final ObserverSpecification constructExportObserverConfig = new ObserverSpecification(ConstructQueryResultObserver.class.getName(),
- constructParams);
- observers.add(constructExportObserverConfig);
-
- // Add the observers to the Fluo Configuration.
- super.getFluoConfiguration().addObservers(observers);
- }
-
- /**
- * setup mini kafka and call the super to setup mini fluo
- */
- @Before
- public void setupKafka() throws Exception {
- // Install an instance of Rya on the Accumulo cluster.
- installRyaInstance();
-
- // Setup Kafka.
- zkServer = new EmbeddedZookeeper();
- final String zkConnect = ZKHOST + ":" + zkServer.port();
- zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer$.MODULE$);
- zkUtils = ZkUtils.apply(zkClient, false);
-
- // setup Broker
- final Properties brokerProps = new Properties();
- brokerProps.setProperty("zookeeper.connect", zkConnect);
- brokerProps.setProperty("broker.id", "0");
- brokerProps.setProperty("log.dirs", Files.createTempDirectory("kafka-").toAbsolutePath().toString());
- brokerProps.setProperty("listeners", "PLAINTEXT://" + BROKERHOST + ":" + BROKERPORT);
- final KafkaConfig config = new KafkaConfig(brokerProps);
- final Time mock = new MockTime();
- kafkaServer = TestUtils.createServer(config, mock);
- }
-
- @After
- public void teardownRya() {
- final MiniAccumuloCluster cluster = super.getMiniAccumuloCluster();
- final String instanceName = cluster.getInstanceName();
- final String zookeepers = cluster.getZooKeepers();
-
- // Uninstall the instance of Rya.
- final RyaClient ryaClient = AccumuloRyaClientFactory.build(
- new AccumuloConnectionDetails(ACCUMULO_USER, ACCUMULO_PASSWORD.toCharArray(), instanceName, zookeepers),
- super.getAccumuloConnector());
-
- try {
- ryaClient.getUninstall().uninstall(RYA_INSTANCE_NAME);
- // Shutdown the repo.
- if(ryaSailRepo != null) {ryaSailRepo.shutDown();}
- if(dao != null ) {dao.destroy();}
- } catch (Exception e) {
- System.out.println("Encountered the following Exception when shutting down Rya: " + e.getMessage());
- }
- }
-
- private void installRyaInstance() throws Exception {
- final MiniAccumuloCluster cluster = super.getMiniAccumuloCluster();
- final String instanceName = cluster.getInstanceName();
- final String zookeepers = cluster.getZooKeepers();
-
- // Install the Rya instance to the mini accumulo cluster.
- final RyaClient ryaClient = AccumuloRyaClientFactory.build(
- new AccumuloConnectionDetails(ACCUMULO_USER, ACCUMULO_PASSWORD.toCharArray(), instanceName, zookeepers),
- super.getAccumuloConnector());
-
- ryaClient.getInstall().install(RYA_INSTANCE_NAME,
- InstallConfiguration.builder().setEnableTableHashPrefix(false).setEnableFreeTextIndex(false)
- .setEnableEntityCentricIndex(false).setEnableGeoIndex(false).setEnableTemporalIndex(false).setEnablePcjIndex(true)
- .setFluoPcjAppName(super.getFluoConfiguration().getApplicationName()).build());
-
- // Connect to the Rya instance that was just installed.
- final AccumuloRdfConfiguration conf = makeConfig(instanceName, zookeepers);
- final Sail sail = RyaSailFactory.getInstance(conf);
- dao = RyaSailFactory.getAccumuloDAOWithUpdatedConfig(conf);
- ryaSailRepo = new RyaSailRepository(sail);
- }
-
- protected AccumuloRdfConfiguration makeConfig(final String instanceName, final String zookeepers) {
- final AccumuloRdfConfiguration conf = new AccumuloRdfConfiguration();
- conf.setTablePrefix(RYA_INSTANCE_NAME);
-
- // Accumulo connection information.
- conf.setAccumuloUser(AccumuloExportITBase.ACCUMULO_USER);
- conf.setAccumuloPassword(AccumuloExportITBase.ACCUMULO_PASSWORD);
- conf.setAccumuloInstance(super.getAccumuloConnector().getInstance().getInstanceName());
- conf.setAccumuloZookeepers(super.getAccumuloConnector().getInstance().getZooKeepers());
- conf.setAuths("");
-
- // PCJ configuration information.
- conf.set(ConfigUtils.USE_PCJ, "true");
- conf.set(ConfigUtils.USE_PCJ_UPDATER_INDEX, "true");
- conf.set(ConfigUtils.FLUO_APP_NAME, super.getFluoConfiguration().getApplicationName());
- conf.set(ConfigUtils.PCJ_STORAGE_TYPE, PrecomputedJoinIndexerConfig.PrecomputedJoinStorageType.ACCUMULO.toString());
- conf.set(ConfigUtils.PCJ_UPDATER_TYPE, PrecomputedJoinIndexerConfig.PrecomputedJoinUpdaterType.FLUO.toString());
-
- conf.setDisplayQueryPlan(true);
-
- return conf;
- }
-
- /**
- * @return A {@link RyaSailRepository} that is connected to the Rya instance
- * that statements are loaded into.
- */
- protected RyaSailRepository getRyaSailRepository() throws Exception {
- return ryaSailRepo;
- }
-
- /**
- * @return A {@link AccumuloRyaDAO} so that RyaStatements with distinct
- * visibilities can be added to the Rya Instance
- */
- protected AccumuloRyaDAO getRyaDAO() {
- return dao;
- }
-
- /**
- * Close all the Kafka mini server and mini-zookeeper
- */
- @After
- public void teardownKafka() {
- if(kafkaServer != null) {kafkaServer.shutdown();}
- if(zkClient != null) {zkClient.close();}
- if(zkServer != null) {zkServer.shutdown();}
- }
-
- /**
- * Test kafka without rya code to make sure kafka works in this environment.
- * If this test fails then its a testing environment issue, not with Rya.
- * Source: https://github.com/asmaier/mini-kafka
- */
- @Test
- public void embeddedKafkaTest() throws Exception {
- // create topic
- final String topic = "testTopic";
- AdminUtils.createTopic(zkUtils, topic, 1, 1, new Properties(), RackAwareMode.Disabled$.MODULE$);
-
- // setup producer
- final Properties producerProps = new Properties();
- producerProps.setProperty("bootstrap.servers", BROKERHOST + ":" + BROKERPORT);
- producerProps.setProperty("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");
- producerProps.setProperty("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
- final KafkaProducer<Integer, byte[]> producer = new KafkaProducer<>(producerProps);
-
- // setup consumer
- final Properties consumerProps = new Properties();
- consumerProps.setProperty("bootstrap.servers", BROKERHOST + ":" + BROKERPORT);
- consumerProps.setProperty("group.id", "group0");
- consumerProps.setProperty("client.id", "consumer0");
- consumerProps.setProperty("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer");
- consumerProps.setProperty("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
-
- // to make sure the consumer starts from the beginning of the topic
- consumerProps.put("auto.offset.reset", "earliest");
-
- final KafkaConsumer<Integer, byte[]> consumer = new KafkaConsumer<>(consumerProps);
- consumer.subscribe(Arrays.asList(topic));
-
- // send message
- final ProducerRecord<Integer, byte[]> data = new ProducerRecord<>(topic, 42, "test-message".getBytes(StandardCharsets.UTF_8));
- producer.send(data);
- producer.close();
-
- // starting consumer
- final ConsumerRecords<Integer, byte[]> records = consumer.poll(3000);
- assertEquals(1, records.count());
- final Iterator<ConsumerRecord<Integer, byte[]>> recordIterator = records.iterator();
- final ConsumerRecord<Integer, byte[]> record = recordIterator.next();
- assertEquals(42, (int) record.key());
- assertEquals("test-message", new String(record.value(), StandardCharsets.UTF_8));
- consumer.close();
- }
-
- protected KafkaConsumer<Integer, VisibilityBindingSet> makeConsumer(final String TopicName) {
- // setup consumer
- final Properties consumerProps = new Properties();
- consumerProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKERHOST + ":" + BROKERPORT);
- consumerProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group0");
- consumerProps.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "consumer0");
- consumerProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
- "org.apache.kafka.common.serialization.IntegerDeserializer");
- consumerProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
- "org.apache.rya.indexing.pcj.fluo.app.export.kafka.KryoVisibilityBindingSetSerializer");
-
- // to make sure the consumer starts from the beginning of the topic
- consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-
- final KafkaConsumer<Integer, VisibilityBindingSet> consumer = new KafkaConsumer<>(consumerProps);
- consumer.subscribe(Arrays.asList(TopicName));
- return consumer;
- }
-
- protected String loadData(final String sparql, final Collection<Statement> statements) throws Exception {
- requireNonNull(sparql);
- requireNonNull(statements);
-
- // Register the PCJ with Rya.
- final Instance accInstance = super.getAccumuloConnector().getInstance();
- final Connector accumuloConn = super.getAccumuloConnector();
-
- final RyaClient ryaClient = AccumuloRyaClientFactory.build(new AccumuloConnectionDetails(ACCUMULO_USER,
- ACCUMULO_PASSWORD.toCharArray(), accInstance.getInstanceName(), accInstance.getZooKeepers()), accumuloConn);
-
- final String pcjId = ryaClient.getCreatePCJ().createPCJ(RYA_INSTANCE_NAME, sparql);
-
- // Write the data to Rya.
- final SailRepositoryConnection ryaConn = getRyaSailRepository().getConnection();
- ryaConn.begin();
- ryaConn.add(statements);
- ryaConn.commit();
- ryaConn.close();
-
- // Wait for the Fluo application to finish computing the end result.
- super.getMiniFluo().waitForObservers();
-
- // The PCJ Id is the topic name the results will be written to.
- return pcjId;
- }
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/RyaExportITBase.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/RyaExportITBase.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/RyaExportITBase.java
deleted file mode 100644
index 4eab0f6..0000000
--- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/RyaExportITBase.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.rya.indexing.pcj.fluo;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-
-import org.apache.fluo.api.config.ObserverSpecification;
-import org.apache.log4j.BasicConfigurator;
-import org.apache.log4j.Level;
-import org.apache.log4j.Logger;
-import org.apache.rya.indexing.pcj.fluo.app.export.kafka.KafkaExportParameters;
-import org.apache.rya.indexing.pcj.fluo.app.export.rya.RyaExportParameters;
-import org.apache.rya.indexing.pcj.fluo.app.observers.AggregationObserver;
-import org.apache.rya.indexing.pcj.fluo.app.observers.ConstructQueryResultObserver;
-import org.apache.rya.indexing.pcj.fluo.app.observers.FilterObserver;
-import org.apache.rya.indexing.pcj.fluo.app.observers.JoinObserver;
-import org.apache.rya.indexing.pcj.fluo.app.observers.QueryResultObserver;
-import org.apache.rya.indexing.pcj.fluo.app.observers.StatementPatternObserver;
-import org.apache.rya.indexing.pcj.fluo.app.observers.TripleObserver;
-import org.junit.BeforeClass;
-
-/**
- * The base Integration Test class used for Fluo applications that export to a Rya PCJ Index.
- */
-public class RyaExportITBase extends FluoITBase {
-
- @BeforeClass
- public static void setupLogging() {
- BasicConfigurator.configure();
- Logger.getRootLogger().setLevel(Level.ERROR);
- }
-
- @Override
- protected void preFluoInitHook() throws Exception {
- // Setup the observers that will be used by the Fluo PCJ Application.
- final List<ObserverSpecification> observers = new ArrayList<>();
- observers.add(new ObserverSpecification(TripleObserver.class.getName()));
- observers.add(new ObserverSpecification(StatementPatternObserver.class.getName()));
- observers.add(new ObserverSpecification(JoinObserver.class.getName()));
- observers.add(new ObserverSpecification(FilterObserver.class.getName()));
- observers.add(new ObserverSpecification(AggregationObserver.class.getName()));
-
- // Configure the export observer to export new PCJ results to the mini accumulo cluster.
- final HashMap<String, String> exportParams = new HashMap<>();
- final RyaExportParameters ryaParams = new RyaExportParameters(exportParams);
- ryaParams.setExportToRya(true);
- ryaParams.setRyaInstanceName(getRyaInstanceName());
- ryaParams.setAccumuloInstanceName(super.getMiniAccumuloCluster().getInstanceName());
- ryaParams.setZookeeperServers(super.getMiniAccumuloCluster().getZooKeepers());
- ryaParams.setExporterUsername(getUsername());
- ryaParams.setExporterPassword(getPassword());
-
- final ObserverSpecification exportObserverConfig = new ObserverSpecification(QueryResultObserver.class.getName(), exportParams);
- observers.add(exportObserverConfig);
-
- final KafkaExportParameters kafkaParams = new KafkaExportParameters(exportParams);
- kafkaParams.setExportToKafka(false);
-
- final ObserverSpecification constructExportObserverConfig = new ObserverSpecification(ConstructQueryResultObserver.class.getName(),
- exportParams);
- observers.add(constructExportObserverConfig);
-
- // Add the observers to the Fluo Configuration.
- super.getFluoConfiguration().addObservers(observers);
- }
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/CountStatementsIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/CountStatementsIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/CountStatementsIT.java
index 3a42a23..cb34d06 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/CountStatementsIT.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/CountStatementsIT.java
@@ -29,7 +29,7 @@ import org.apache.fluo.api.client.FluoFactory;
import org.apache.fluo.api.config.ObserverSpecification;
import org.apache.rya.api.domain.RyaStatement;
import org.apache.rya.api.domain.RyaURI;
-import org.apache.rya.indexing.pcj.fluo.RyaExportITBase;
+import org.apache.rya.pcj.fluo.test.base.RyaExportITBase;
import org.junit.Test;
import com.google.common.base.Optional;
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetPcjMetadataIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetPcjMetadataIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetPcjMetadataIT.java
index 9a1c285..d5c0e5f 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetPcjMetadataIT.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetPcjMetadataIT.java
@@ -30,7 +30,6 @@ import org.apache.accumulo.core.client.Connector;
import org.apache.fluo.api.client.FluoClient;
import org.apache.fluo.api.client.FluoFactory;
import org.apache.rya.api.persist.RyaDAOException;
-import org.apache.rya.indexing.pcj.fluo.RyaExportITBase;
import org.apache.rya.indexing.pcj.fluo.api.GetPcjMetadata.NotInAccumuloException;
import org.apache.rya.indexing.pcj.fluo.api.GetPcjMetadata.NotInFluoException;
import org.apache.rya.indexing.pcj.storage.PcjException;
@@ -39,6 +38,7 @@ import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage;
import org.apache.rya.indexing.pcj.storage.accumulo.ShiftVarOrderFactory;
import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
+import org.apache.rya.pcj.fluo.test.base.RyaExportITBase;
import org.junit.Test;
import org.openrdf.query.MalformedQueryException;
import org.openrdf.query.QueryEvaluationException;
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetQueryReportIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetQueryReportIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetQueryReportIT.java
index d19646e..965a7b9 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetQueryReportIT.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetQueryReportIT.java
@@ -31,13 +31,13 @@ import org.apache.fluo.api.client.FluoClient;
import org.apache.fluo.api.client.FluoFactory;
import org.apache.rya.api.domain.RyaStatement;
import org.apache.rya.api.domain.RyaURI;
-import org.apache.rya.indexing.pcj.fluo.RyaExportITBase;
import org.apache.rya.indexing.pcj.fluo.api.GetQueryReport.QueryReport;
import org.apache.rya.indexing.pcj.fluo.app.query.FluoQuery;
import org.apache.rya.indexing.pcj.fluo.app.query.StatementPatternMetadata;
import org.apache.rya.indexing.pcj.storage.PcjMetadata;
import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage;
+import org.apache.rya.pcj.fluo.test.base.RyaExportITBase;
import org.junit.Test;
import com.google.common.base.Optional;
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/ListQueryIdsIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/ListQueryIdsIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/ListQueryIdsIT.java
index ec301ba..e3914bd 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/ListQueryIdsIT.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/ListQueryIdsIT.java
@@ -29,7 +29,7 @@ import org.apache.accumulo.core.client.TableExistsException;
import org.apache.fluo.api.client.FluoClient;
import org.apache.fluo.api.client.FluoFactory;
import org.apache.fluo.api.client.Transaction;
-import org.apache.rya.indexing.pcj.fluo.RyaExportITBase;
+import org.apache.rya.pcj.fluo.test.base.RyaExportITBase;
import org.junit.Test;
import com.beust.jcommander.internal.Lists;
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAOIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAOIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAOIT.java
index accabbf..d403404 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAOIT.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAOIT.java
@@ -21,12 +21,12 @@ package org.apache.rya.indexing.pcj.fluo.app.query;
import static org.junit.Assert.assertEquals;
import java.util.List;
+import java.util.concurrent.TimeUnit;
import org.apache.fluo.api.client.FluoClient;
import org.apache.fluo.api.client.FluoFactory;
import org.apache.fluo.api.client.Snapshot;
import org.apache.fluo.api.client.Transaction;
-import org.apache.rya.indexing.pcj.fluo.RyaExportITBase;
import org.apache.rya.indexing.pcj.fluo.app.ConstructGraph;
import org.apache.rya.indexing.pcj.fluo.app.query.AggregationMetadata.AggregationElement;
import org.apache.rya.indexing.pcj.fluo.app.query.AggregationMetadata.AggregationType;
@@ -34,6 +34,7 @@ import org.apache.rya.indexing.pcj.fluo.app.query.FluoQuery.QueryType;
import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata.JoinType;
import org.apache.rya.indexing.pcj.fluo.app.query.SparqlFluoQueryBuilder.NodeIds;
import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
+import org.apache.rya.pcj.fluo.test.base.RyaExportITBase;
import org.junit.Test;
import org.openrdf.query.MalformedQueryException;
import org.openrdf.query.algebra.StatementPattern;
@@ -42,8 +43,6 @@ import org.openrdf.query.parser.ParsedQuery;
import org.openrdf.query.parser.sparql.SPARQLParser;
import org.openrdf.repository.RepositoryException;
-import com.google.common.base.Optional;
-
/**
* Integration tests the methods of {@link FluoQueryMetadataDAO}.
*/
@@ -87,8 +86,7 @@ public class FluoQueryMetadataDAOIT extends RyaExportITBase {
builder.setVarOrder(new VariableOrder("e;f"));
builder.setParentNodeId("parentNodeId");
builder.setChildNodeId("childNodeId");
- builder.setOriginalSparql("originalSparql");
- builder.setFilterIndexWithinSparql(2);
+ builder.setFilterSparql("originalSparql");
final FilterMetadata originalMetadata = builder.build();
try(FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) {
@@ -232,11 +230,10 @@ public class FluoQueryMetadataDAOIT extends RyaExportITBase {
storedMetadata = dao.readAggregationMetadata(sx, "nodeId");
}
- // Ensure the deserialized object is the same as the serialized one.
- assertEquals(originalMetadata, storedMetadata);
}
}
+
@Test
public void aggregationMetadataTest_noGroupByVarOrders() {
final FluoQueryMetadataDAO dao = new FluoQueryMetadataDAO();
@@ -267,6 +264,41 @@ public class FluoQueryMetadataDAOIT extends RyaExportITBase {
assertEquals(originalMetadata, storedMetadata);
}
}
+
+ @Test
+ public void periodicQueryMetadataTest() {
+ final FluoQueryMetadataDAO dao = new FluoQueryMetadataDAO();
+
+ // Create the object that will be serialized.
+ PeriodicQueryMetadata originalMetadata = PeriodicQueryMetadata.builder()
+ .setNodeId("nodeId")
+ .setParentNodeId("parentNodeId")
+ .setVarOrder(new VariableOrder("a","b","c"))
+ .setChildNodeId("childNodeId")
+ .setPeriod(10)
+ .setWindowSize(20)
+ .setUnit(TimeUnit.DAYS)
+ .setTemporalVariable("a")
+ .build();
+
+
+ try(FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) {
+ // Write it to the Fluo table.
+ try(Transaction tx = fluoClient.newTransaction()) {
+ dao.write(tx, originalMetadata);
+ tx.commit();
+ }
+
+ // Read it from the Fluo table.
+ PeriodicQueryMetadata storedMetadata = null;
+ try(Snapshot sx = fluoClient.newSnapshot()) {
+ storedMetadata = dao.readPeriodicQueryMetadata(sx, "nodeId");
+ }
+
+ // Ensure the deserialized object is the same as the serialized one.
+ assertEquals(originalMetadata, storedMetadata);
+ }
+ }
@Test
public void fluoQueryTest() throws MalformedQueryException {
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/BatchDeleteIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/BatchDeleteIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/BatchDeleteIT.java
new file mode 100644
index 0000000..0cd7cfb
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/BatchDeleteIT.java
@@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.integration;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.fluo.api.client.FluoClient;
+import org.apache.fluo.api.client.Snapshot;
+import org.apache.fluo.api.client.Transaction;
+import org.apache.fluo.api.client.scanner.ColumnScanner;
+import org.apache.fluo.api.client.scanner.RowScanner;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.data.ColumnValue;
+import org.apache.fluo.api.data.Span;
+import org.apache.fluo.core.client.FluoClientImpl;
+import org.apache.log4j.Logger;
+import org.apache.rya.api.domain.RyaStatement;
+import org.apache.rya.api.domain.RyaURI;
+import org.apache.rya.indexing.pcj.fluo.api.CreatePcj;
+import org.apache.rya.indexing.pcj.fluo.api.InsertTriples;
+import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants;
+import org.apache.rya.indexing.pcj.fluo.app.JoinResultUpdater.Side;
+import org.apache.rya.indexing.pcj.fluo.app.NodeType;
+import org.apache.rya.indexing.pcj.fluo.app.batch.BatchInformation;
+import org.apache.rya.indexing.pcj.fluo.app.batch.BatchInformation.Task;
+import org.apache.rya.indexing.pcj.fluo.app.batch.BatchInformationDAO;
+import org.apache.rya.indexing.pcj.fluo.app.batch.JoinBatchInformation;
+import org.apache.rya.indexing.pcj.fluo.app.batch.SpanBatchDeleteInformation;
+import org.apache.rya.indexing.pcj.fluo.app.query.FluoQuery;
+import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
+import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO;
+import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata;
+import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata.JoinType;
+import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
+import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage;
+import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
+import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
+import org.apache.rya.pcj.fluo.test.base.RyaExportITBase;
+import org.junit.Test;
+import org.openrdf.model.impl.URIImpl;
+import org.openrdf.query.algebra.evaluation.QueryBindingSet;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+
+public class BatchDeleteIT extends RyaExportITBase {
+
+ private static final Logger log = Logger.getLogger(BatchDeleteIT.class);
+ private static final FluoQueryMetadataDAO dao = new FluoQueryMetadataDAO();
+
+ @Test
+ public void simpleScanDelete() throws Exception {
+
+ final String sparql = "SELECT ?subject ?object1 ?object2 WHERE { ?subject <urn:predicate_1> ?object1; "
+ + " <urn:predicate_2> ?object2 } ";
+ try (FluoClient fluoClient = new FluoClientImpl(getFluoConfiguration())) {
+
+ RyaURI subj = new RyaURI("urn:subject_1");
+ RyaStatement statement1 = new RyaStatement(subj, new RyaURI("urn:predicate_1"), null);
+ RyaStatement statement2 = new RyaStatement(subj, new RyaURI("urn:predicate_2"), null);
+ Set<RyaStatement> statements1 = getRyaStatements(statement1, 10);
+ Set<RyaStatement> statements2 = getRyaStatements(statement2, 10);
+
+ // Create the PCJ table.
+ final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(getAccumuloConnector(), getRyaInstanceName());
+ final String pcjId = pcjStorage.createPcj(sparql);
+
+ // Tell the Fluo app to maintain the PCJ.
+ String queryId = new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, getAccumuloConnector(), getRyaInstanceName());
+
+ List<String> ids = getNodeIdStrings(fluoClient, queryId);
+ List<String> prefixes = Arrays.asList("urn:subject_1", "urn:object", "urn:subject_1", "urn:subject_1");
+
+ // Stream the data into Fluo.
+ InsertTriples inserter = new InsertTriples();
+ inserter.insert(fluoClient, statements1, Optional.<String> absent());
+ inserter.insert(fluoClient, statements2, Optional.<String> absent());
+
+ // Verify the end results of the query match the expected results.
+ getMiniFluo().waitForObservers();
+
+ verifyCounts(fluoClient, ids, Arrays.asList(100, 100, 10, 10));
+
+ createSpanBatches(fluoClient, ids, prefixes, 10);
+ getMiniFluo().waitForObservers();
+
+ verifyCounts(fluoClient, ids, Arrays.asList(0, 0, 0, 0));
+ }
+ }
+
+ @Test
+ public void simpleJoinDelete() throws Exception {
+ final String sparql = "SELECT ?subject ?object1 ?object2 WHERE { ?subject <urn:predicate_1> ?object1; "
+ + " <urn:predicate_2> ?object2 } ";
+ try (FluoClient fluoClient = new FluoClientImpl(getFluoConfiguration())) {
+
+ RyaURI subj = new RyaURI("urn:subject_1");
+ RyaStatement statement1 = new RyaStatement(subj, new RyaURI("urn:predicate_1"), null);
+ RyaStatement statement2 = new RyaStatement(subj, new RyaURI("urn:predicate_2"), null);
+ Set<RyaStatement> statements1 = getRyaStatements(statement1, 5);
+ Set<RyaStatement> statements2 = getRyaStatements(statement2, 5);
+
+ // Create the PCJ table.
+ final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(getAccumuloConnector(), getRyaInstanceName());
+ final String pcjId = pcjStorage.createPcj(sparql);
+
+ // Tell the Fluo app to maintain the PCJ.
+ String queryId = new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, getAccumuloConnector(), getRyaInstanceName());
+
+ List<String> ids = getNodeIdStrings(fluoClient, queryId);
+ String joinId = ids.get(1);
+ String rightSp = ids.get(3);
+ QueryBindingSet bs = new QueryBindingSet();
+ bs.addBinding("subject", new URIImpl("urn:subject_1"));
+ bs.addBinding("object1", new URIImpl("urn:object_0"));
+ VisibilityBindingSet vBs = new VisibilityBindingSet(bs);
+ Span span = Span.prefix(Bytes.of(rightSp + IncrementalUpdateConstants.NODEID_BS_DELIM + "urn:subject_1"));
+ VariableOrder varOrder = new VariableOrder(Arrays.asList("subject", "object2"));
+
+ // Stream the data into Fluo.
+ InsertTriples inserter = new InsertTriples();
+ inserter.insert(fluoClient, statements1, Optional.<String> absent());
+ inserter.insert(fluoClient, statements2, Optional.<String> absent());
+
+ getMiniFluo().waitForObservers();
+ verifyCounts(fluoClient, ids, Arrays.asList(25, 25, 5, 5));
+
+ JoinBatchInformation batch = JoinBatchInformation.builder().setBatchSize(1)
+ .setColumn(FluoQueryColumns.STATEMENT_PATTERN_BINDING_SET).setSpan(span).setTask(Task.Delete)
+ .setJoinType(JoinType.NATURAL_JOIN).setSide(Side.LEFT).setBs(vBs).setVarOrder(varOrder).build();
+ // Verify the end results of the query match the expected results.
+ createSpanBatch(fluoClient, joinId, batch);
+
+ getMiniFluo().waitForObservers();
+ verifyCounts(fluoClient, ids, Arrays.asList(25, 20, 5, 5));
+ }
+ }
+
+ @Test
+ public void simpleJoinAdd() throws Exception {
+ final String sparql = "SELECT ?subject ?object1 ?object2 WHERE { ?subject <urn:predicate_1> ?object1; "
+ + " <urn:predicate_2> ?object2 } ";
+ try (FluoClient fluoClient = new FluoClientImpl(getFluoConfiguration())) {
+
+ RyaURI subj = new RyaURI("urn:subject_1");
+ RyaStatement statement2 = new RyaStatement(subj, new RyaURI("urn:predicate_2"), null);
+ Set<RyaStatement> statements2 = getRyaStatements(statement2, 5);
+
+ // Create the PCJ table.
+ final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(getAccumuloConnector(), getRyaInstanceName());
+ final String pcjId = pcjStorage.createPcj(sparql);
+
+ // Tell the Fluo app to maintain the PCJ.
+ String queryId = new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, getAccumuloConnector(), getRyaInstanceName());
+
+ List<String> ids = getNodeIdStrings(fluoClient, queryId);
+ String joinId = ids.get(1);
+ String rightSp = ids.get(3);
+ QueryBindingSet bs = new QueryBindingSet();
+ bs.addBinding("subject", new URIImpl("urn:subject_1"));
+ bs.addBinding("object1", new URIImpl("urn:object_0"));
+ VisibilityBindingSet vBs = new VisibilityBindingSet(bs);
+ Span span = Span.prefix(Bytes.of(rightSp + IncrementalUpdateConstants.NODEID_BS_DELIM + "urn:subject_1"));
+ VariableOrder varOrder = new VariableOrder(Arrays.asList("subject", "object2"));
+
+ // Stream the data into Fluo.
+ InsertTriples inserter = new InsertTriples();
+ inserter.insert(fluoClient, statements2, Optional.<String> absent());
+
+ getMiniFluo().waitForObservers();
+ verifyCounts(fluoClient, ids, Arrays.asList(0, 0, 0, 5));
+
+ JoinBatchInformation batch = JoinBatchInformation.builder().setBatchSize(1)
+ .setColumn(FluoQueryColumns.STATEMENT_PATTERN_BINDING_SET).setSpan(span).setTask(Task.Add)
+ .setJoinType(JoinType.NATURAL_JOIN).setSide(Side.LEFT).setBs(vBs).setVarOrder(varOrder).build();
+ // Verify the end results of the query match the expected results.
+ createSpanBatch(fluoClient, joinId, batch);
+
+ getMiniFluo().waitForObservers();
+ verifyCounts(fluoClient, ids, Arrays.asList(5, 5, 0, 5));
+ }
+ }
+
+ private Set<RyaStatement> getRyaStatements(RyaStatement statement, int numTriples) {
+
+ Set<RyaStatement> statements = new HashSet<>();
+ final String subject = "urn:subject_";
+ final String predicate = "urn:predicate_";
+ final String object = "urn:object_";
+
+ for (int i = 0; i < numTriples; i++) {
+ RyaStatement stmnt = new RyaStatement(statement.getSubject(), statement.getPredicate(), statement.getObject());
+ if (stmnt.getSubject() == null) {
+ stmnt.setSubject(new RyaURI(subject + i));
+ }
+ if (stmnt.getPredicate() == null) {
+ stmnt.setPredicate(new RyaURI(predicate + i));
+ }
+ if (stmnt.getObject() == null) {
+ stmnt.setObject(new RyaURI(object + i));
+ }
+ statements.add(stmnt);
+ }
+ return statements;
+ }
+
+ private List<String> getNodeIdStrings(FluoClient fluoClient, String queryId) {
+ List<String> nodeStrings = new ArrayList<>();
+ try (Snapshot sx = fluoClient.newSnapshot()) {
+ FluoQuery query = dao.readFluoQuery(sx, queryId);
+ nodeStrings.add(queryId);
+ Collection<JoinMetadata> jMeta = query.getJoinMetadata();
+ for (JoinMetadata meta : jMeta) {
+ nodeStrings.add(meta.getNodeId());
+ nodeStrings.add(meta.getLeftChildNodeId());
+ nodeStrings.add(meta.getRightChildNodeId());
+ }
+ }
+ return nodeStrings;
+ }
+
+ private void createSpanBatches(FluoClient fluoClient, List<String> ids, List<String> prefixes, int batchSize) {
+
+ Preconditions.checkArgument(ids.size() == prefixes.size());
+
+ try (Transaction tx = fluoClient.newTransaction()) {
+ for (int i = 0; i < ids.size(); i++) {
+ String id = ids.get(i);
+ String bsPrefix = prefixes.get(i);
+ NodeType type = NodeType.fromNodeId(id).get();
+ Column bsCol = type.getResultColumn();
+ String row = id + IncrementalUpdateConstants.NODEID_BS_DELIM + bsPrefix;
+ Span span = Span.prefix(Bytes.of(row));
+ BatchInformation batch = SpanBatchDeleteInformation.builder().setBatchSize(batchSize).setColumn(bsCol).setSpan(span)
+ .build();
+ BatchInformationDAO.addBatch(tx, id, batch);
+ }
+ tx.commit();
+ }
+ }
+
+ private void createSpanBatch(FluoClient fluoClient, String nodeId, BatchInformation batch) {
+ try (Transaction tx = fluoClient.newTransaction()) {
+ BatchInformationDAO.addBatch(tx, nodeId, batch);
+ tx.commit();
+ }
+ }
+
+ private int countResults(FluoClient fluoClient, String nodeId, Column bsColumn) {
+ try (Transaction tx = fluoClient.newTransaction()) {
+ int count = 0;
+ RowScanner scanner = tx.scanner().over(Span.prefix(nodeId)).fetch(bsColumn).byRow().build();
+ Iterator<ColumnScanner> colScanners = scanner.iterator();
+ while (colScanners.hasNext()) {
+ ColumnScanner colScanner = colScanners.next();
+ Iterator<ColumnValue> vals = colScanner.iterator();
+ while (vals.hasNext()) {
+ vals.next();
+ count++;
+ }
+ }
+ tx.commit();
+ return count;
+ }
+ }
+
+ private void verifyCounts(FluoClient fluoClient, List<String> ids, List<Integer> expectedCounts) {
+ Preconditions.checkArgument(ids.size() == expectedCounts.size());
+ for (int i = 0; i < ids.size(); i++) {
+ String id = ids.get(i);
+ int expected = expectedCounts.get(i);
+ NodeType type = NodeType.fromNodeId(id).get();
+ int count = countResults(fluoClient, id, type.getResultColumn());
+ log.trace("NodeId: " + id + " Count: " + count + " Expected: " + expected);
+ switch (type) {
+ case STATEMENT_PATTERN:
+ assertEquals(expected, count);
+ break;
+ case JOIN:
+ assertEquals(expected, count);
+ break;
+ case QUERY:
+ assertEquals(expected, count);
+ break;
+ default:
+ break;
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeleteIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeleteIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeleteIT.java
index 414fa70..0f2d892 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeleteIT.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeleteIT.java
@@ -35,8 +35,8 @@ import org.apache.fluo.api.data.Bytes;
import org.apache.fluo.api.data.Span;
import org.apache.rya.api.client.RyaClient;
import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory;
-import org.apache.rya.indexing.pcj.fluo.RyaExportITBase;
import org.apache.rya.indexing.pcj.fluo.api.DeletePcj;
+import org.apache.rya.pcj.fluo.test.base.RyaExportITBase;
import org.junit.Test;
import org.openrdf.model.Statement;
import org.openrdf.model.ValueFactory;