You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rya.apache.org by ca...@apache.org on 2017/04/24 15:06:23 UTC
[5/9] incubator-rya git commit: RYA-260 Fluo PCJ application has had
Aggregation support added to it. Also fixed a bunch of resource leaks that
were causing integration tests to fail. Closes #156.
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java
index f9acb11..dfc3333 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java
@@ -18,23 +18,30 @@
*/
package org.apache.rya.indexing.pcj.fluo.app.query;
-import static com.google.common.base.Preconditions.checkNotNull;
+import static java.util.Objects.requireNonNull;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.Collection;
import java.util.Map;
-import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
-import edu.umd.cs.findbugs.annotations.NonNull;
-
+import org.apache.fluo.api.client.SnapshotBase;
+import org.apache.fluo.api.client.TransactionBase;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.Column;
+import org.apache.log4j.Logger;
import org.apache.rya.indexing.pcj.fluo.app.NodeType;
+import org.apache.rya.indexing.pcj.fluo.app.query.AggregationMetadata.AggregationElement;
import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata.JoinType;
import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
-import com.google.common.collect.Sets;
+import com.google.common.base.Charsets;
+import com.google.common.base.Joiner;
-import org.apache.fluo.api.client.SnapshotBase;
-import org.apache.fluo.api.client.TransactionBase;
-import org.apache.fluo.api.data.Bytes;
-import org.apache.fluo.api.data.Column;
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
/**
* Reads and writes {@link FluoQuery} instances and their components to/from
@@ -50,8 +57,8 @@ public class FluoQueryMetadataDAO {
* @param metadata - The Query node metadata that will be written to the table. (not null)
*/
public void write(final TransactionBase tx, final QueryMetadata metadata) {
- checkNotNull(tx);
- checkNotNull(metadata);
+ requireNonNull(tx);
+ requireNonNull(metadata);
final String rowId = metadata.getNodeId();
tx.set(rowId, FluoQueryColumns.QUERY_NODE_ID, rowId);
@@ -65,19 +72,19 @@ public class FluoQueryMetadataDAO {
*
* @param sx - The snapshot that will be used to read the metadata . (not null)
* @param nodeId - The nodeId of the Query node that will be read. (not nul)
- * @return The {@link QueryMetadata} that was read from table.
+ * @return The {@link QueryMetadata} that was read from the table.
*/
public QueryMetadata readQueryMetadata(final SnapshotBase sx, final String nodeId) {
return readQueryMetadataBuilder(sx, nodeId).build();
}
private QueryMetadata.Builder readQueryMetadataBuilder(final SnapshotBase sx, final String nodeId) {
- checkNotNull(sx);
- checkNotNull(nodeId);
+ requireNonNull(sx);
+ requireNonNull(nodeId);
// Fetch the values from the Fluo table.
final String rowId = nodeId;
- final Map<Column, String> values = sx.gets(rowId,
+ final Map<Column, String> values = sx.gets(rowId,
FluoQueryColumns.QUERY_VARIABLE_ORDER,
FluoQueryColumns.QUERY_SPARQL,
FluoQueryColumns.QUERY_CHILD_NODE_ID);
@@ -102,8 +109,8 @@ public class FluoQueryMetadataDAO {
* @param metadata - The Filter node metadata that will be written to the table. (not null)
*/
public void write(final TransactionBase tx, final FilterMetadata metadata) {
- checkNotNull(tx);
- checkNotNull(metadata);
+ requireNonNull(tx);
+ requireNonNull(metadata);
final String rowId = metadata.getNodeId();
tx.set(rowId, FluoQueryColumns.FILTER_NODE_ID, rowId);
@@ -119,19 +126,19 @@ public class FluoQueryMetadataDAO {
*
* @param sx - The snapshot that will be used to read the metadata. (not null)
* @param nodeId - The nodeId of the Filter node that will be read. (not nul)
- * @return The {@link FilterMetadata} that was read from table.
+ * @return The {@link FilterMetadata} that was read from the table.
*/
public FilterMetadata readFilterMetadata(final SnapshotBase sx, final String nodeId) {
return readFilterMetadataBuilder(sx, nodeId).build();
}
private FilterMetadata.Builder readFilterMetadataBuilder(final SnapshotBase sx, final String nodeId) {
- checkNotNull(sx);
- checkNotNull(nodeId);
+ requireNonNull(sx);
+ requireNonNull(nodeId);
// Fetch the values from the Fluo table.
final String rowId = nodeId;
- final Map<Column, String> values = sx.gets(rowId,
+ final Map<Column, String> values = sx.gets(rowId,
FluoQueryColumns.FILTER_VARIABLE_ORDER,
FluoQueryColumns.FILTER_ORIGINAL_SPARQL,
FluoQueryColumns.FILTER_INDEX_WITHIN_SPARQL,
@@ -162,8 +169,8 @@ public class FluoQueryMetadataDAO {
* @param metadata - The Join node metadata that will be written to the table. (not null)
*/
public void write(final TransactionBase tx, final JoinMetadata metadata) {
- checkNotNull(tx);
- checkNotNull(metadata);
+ requireNonNull(tx);
+ requireNonNull(metadata);
final String rowId = metadata.getNodeId();
tx.set(rowId, FluoQueryColumns.JOIN_NODE_ID, rowId);
@@ -179,15 +186,15 @@ public class FluoQueryMetadataDAO {
*
* @param sx - The snapshot that will be used to read the metadata. (not null)
* @param nodeId - The nodeId of the Join node that will be read. (not nul)
- * @return The {@link JoinMetadata} that was read from table.
+ * @return The {@link JoinMetadata} that was read from the table.
*/
public JoinMetadata readJoinMetadata(final SnapshotBase sx, final String nodeId) {
return readJoinMetadataBuilder(sx, nodeId).build();
}
private JoinMetadata.Builder readJoinMetadataBuilder(final SnapshotBase sx, final String nodeId) {
- checkNotNull(sx);
- checkNotNull(nodeId);
+ requireNonNull(sx);
+ requireNonNull(nodeId);
// Fetch the values from the Fluo table.
final String rowId = nodeId;
@@ -224,8 +231,8 @@ public class FluoQueryMetadataDAO {
* @param metadata - The Statement Pattern node metadata that will be written to the table. (not null)
*/
public void write(final TransactionBase tx, final StatementPatternMetadata metadata) {
- checkNotNull(tx);
- checkNotNull(metadata);
+ requireNonNull(tx);
+ requireNonNull(metadata);
final String rowId = metadata.getNodeId();
tx.set(rowId, FluoQueryColumns.STATEMENT_PATTERN_NODE_ID, rowId);
@@ -239,15 +246,15 @@ public class FluoQueryMetadataDAO {
*
* @param sx - The snapshot that will be used to read the metadata. (not null)
* @param nodeId - The nodeId of the Statement Pattern node that will be read. (not nul)
- * @return The {@link StatementPatternMetadata} that was read from table.
+ * @return The {@link StatementPatternMetadata} that was read from the table.
*/
public StatementPatternMetadata readStatementPatternMetadata(final SnapshotBase sx, final String nodeId) {
return readStatementPatternMetadataBuilder(sx, nodeId).build();
}
private StatementPatternMetadata.Builder readStatementPatternMetadataBuilder(final SnapshotBase sx, final String nodeId) {
- checkNotNull(sx);
- checkNotNull(nodeId);
+ requireNonNull(sx);
+ requireNonNull(nodeId);
// Fetch the values from the Fluo table.
final String rowId = nodeId;
@@ -270,14 +277,104 @@ public class FluoQueryMetadataDAO {
}
/**
+ * Write an instance of {@link AggregationMetadata} to the Fluo table.
+ *
+ * @param tx - The transaction that will be used to commit the metadata. (not null)
+ * @param metadata - The Aggregation node metadata that will be written to the table. (not null)
+ */
+ public void write(final TransactionBase tx, final AggregationMetadata metadata) {
+ requireNonNull(tx);
+ requireNonNull(metadata);
+
+ final String rowId = metadata.getNodeId();
+ tx.set(rowId, FluoQueryColumns.AGGREGATION_NODE_ID, rowId);
+ tx.set(rowId, FluoQueryColumns.AGGREGATION_VARIABLE_ORDER, metadata.getVariableOrder().toString());
+ tx.set(rowId, FluoQueryColumns.AGGREGATION_PARENT_NODE_ID, metadata.getParentNodeId());
+ tx.set(rowId, FluoQueryColumns.AGGREGATION_CHILD_NODE_ID, metadata.getChildNodeId());
+
+ // Store the Group By variable order.
+ final VariableOrder groupByVars = metadata.getGroupByVariableOrder();
+ final String groupByString = Joiner.on(";").join(groupByVars.getVariableOrders());
+ tx.set(rowId, FluoQueryColumns.AGGREGATION_GROUP_BY_BINDING_NAMES, groupByString);
+
+ // Serialize the collection of AggregationElements.
+ final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ try(final ObjectOutputStream oos = new ObjectOutputStream(baos)) {
+ oos.writeObject( metadata.getAggregations() );
+ } catch (final IOException e) {
+ throw new RuntimeException("Problem encountered while writing AggregationMetadata to the Fluo table. Unable " +
+ "to serialize the AggregationElements to a byte[].", e);
+ }
+ tx.set(Bytes.of(rowId.getBytes(Charsets.UTF_8)), FluoQueryColumns.AGGREGATION_AGGREGATIONS, Bytes.of(baos.toByteArray()));
+ }
+
+ /**
+ * Read an instance of {@link AggregationMetadata} from the Fluo table.
+ *
+ * @param sx - The snapshot that will be used to read the metadata. (not null)
+ * @param nodeId - The nodeId of the Aggregation node that will be read. (not null)
+ * @return The {@link AggregationMetadata} that was read from the table.
+ */
+ public AggregationMetadata readAggregationMetadata(final SnapshotBase sx, final String nodeId) {
+ return readAggregationMetadataBuilder(sx, nodeId).build();
+ }
+
+ private AggregationMetadata.Builder readAggregationMetadataBuilder(final SnapshotBase sx, final String nodeId) {
+ requireNonNull(sx);
+ requireNonNull(nodeId);
+
+ // Fetch the values from the Fluo table.
+ final String rowId = nodeId;
+ final Map<Column, String> values = sx.gets(rowId,
+ FluoQueryColumns.AGGREGATION_VARIABLE_ORDER,
+ FluoQueryColumns.AGGREGATION_PARENT_NODE_ID,
+ FluoQueryColumns.AGGREGATION_CHILD_NODE_ID,
+ FluoQueryColumns.AGGREGATION_GROUP_BY_BINDING_NAMES);
+
+
+ // Return an object holding them.
+ final String varOrderString = values.get(FluoQueryColumns.AGGREGATION_VARIABLE_ORDER);
+ final VariableOrder varOrder = new VariableOrder(varOrderString);
+
+ final String parentNodeId = values.get(FluoQueryColumns.AGGREGATION_PARENT_NODE_ID);
+ final String childNodeId = values.get(FluoQueryColumns.AGGREGATION_CHILD_NODE_ID);
+
+ // Read the Group By variable order if one was present.
+ final String groupByString = values.get(FluoQueryColumns.AGGREGATION_GROUP_BY_BINDING_NAMES);
+ final VariableOrder groupByVars = groupByString.isEmpty() ? new VariableOrder() : new VariableOrder( groupByString.split(";") );
+
+ // Deserialize the collection of AggregationElements.
+ final Bytes aggBytes = sx.get(Bytes.of(nodeId.getBytes(Charsets.UTF_8)), FluoQueryColumns.AGGREGATION_AGGREGATIONS);
+ final Collection<AggregationElement> aggregations;
+ try(final ObjectInputStream ois = new ObjectInputStream(aggBytes.toInputStream())) {
+ aggregations = (Collection<AggregationElement>)ois.readObject();
+ } catch (final IOException | ClassNotFoundException e) {
+ throw new RuntimeException("Problem encountered while reading AggregationMetadata from the Fluo table. Unable " +
+ "to deserialize the AggregationElements from a byte[].", e);
+ }
+
+ final AggregationMetadata.Builder builder = AggregationMetadata.builder(nodeId)
+ .setVariableOrder(varOrder)
+ .setParentNodeId(parentNodeId)
+ .setChildNodeId(childNodeId)
+ .setGroupByVariableOrder(groupByVars);
+
+ for(final AggregationElement aggregation : aggregations) {
+ builder.addAggregation(aggregation);
+ }
+
+ return builder;
+ }
+
+ /**
* Write an instance of {@link FluoQuery} to the Fluo table.
*
* @param tx - The transaction that will be used to commit the metadata. (not null)
* @param query - The query metadata that will be written to the table. (not null)
*/
public void write(final TransactionBase tx, final FluoQuery query) {
- checkNotNull(tx);
- checkNotNull(query);
+ requireNonNull(tx);
+ requireNonNull(query);
// Store the Query ID so that it may be looked up from the original SPARQL string.
final String sparql = query.getQueryMetadata().getSparql();
@@ -298,6 +395,10 @@ public class FluoQueryMetadataDAO {
for(final StatementPatternMetadata statementPattern : query.getStatementPatternMetadata()) {
write(tx, statementPattern);
}
+
+ for(final AggregationMetadata aggregation : query.getAggregationMetadata()) {
+ write(tx, aggregation);
+ }
}
/**
@@ -308,8 +409,8 @@ public class FluoQueryMetadataDAO {
* @return The {@link FluoQuery} that was read from table.
*/
public FluoQuery readFluoQuery(final SnapshotBase sx, final String queryId) {
- checkNotNull(sx);
- checkNotNull(queryId);
+ requireNonNull(sx);
+ requireNonNull(queryId);
final FluoQuery.Builder fluoQueryBuilder = FluoQuery.builder();
addChildMetadata(sx, fluoQueryBuilder, queryId);
@@ -317,9 +418,9 @@ public class FluoQueryMetadataDAO {
}
private void addChildMetadata(final SnapshotBase sx, final FluoQuery.Builder builder, final String childNodeId) {
- checkNotNull(sx);
- checkNotNull(builder);
- checkNotNull(childNodeId);
+ requireNonNull(sx);
+ requireNonNull(builder);
+ requireNonNull(childNodeId);
final NodeType childType = NodeType.fromNodeId(childNodeId).get();
switch(childType) {
@@ -357,6 +458,15 @@ public class FluoQueryMetadataDAO {
final StatementPatternMetadata.Builder spBuilder = readStatementPatternMetadataBuilder(sx, childNodeId);
builder.addStatementPatternBuilder(spBuilder);
break;
+
+ case AGGREGATION:
+ // Add this node's metadata.
+ final AggregationMetadata.Builder aggregationBuilder = readAggregationMetadataBuilder(sx, childNodeId);
+ builder.addAggregateMetadata(aggregationBuilder);
+
+ // Add it's child's metadata.
+ addChildMetadata(sx, builder, aggregationBuilder.build().getChildNodeId());
+ break;
}
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/SparqlFluoQueryBuilder.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/SparqlFluoQueryBuilder.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/SparqlFluoQueryBuilder.java
index 2128700..562470a 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/SparqlFluoQueryBuilder.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/SparqlFluoQueryBuilder.java
@@ -19,6 +19,7 @@
package org.apache.rya.indexing.pcj.fluo.app.query;
import static com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.AGGREGATION_PREFIX;
import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.FILTER_PREFIX;
import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.JOIN_PREFIX;
import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.QUERY_PREFIX;
@@ -29,32 +30,40 @@ import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import java.util.UUID;
-
-import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
-import edu.umd.cs.findbugs.annotations.NonNull;
-import net.jcip.annotations.Immutable;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.rya.indexing.pcj.fluo.app.FilterResultUpdater;
import org.apache.rya.indexing.pcj.fluo.app.FluoStringConverter;
import org.apache.rya.indexing.pcj.fluo.app.NodeType;
+import org.apache.rya.indexing.pcj.fluo.app.query.AggregationMetadata.AggregationElement;
+import org.apache.rya.indexing.pcj.fluo.app.query.AggregationMetadata.AggregationType;
import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata.JoinType;
import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
+import org.openrdf.query.algebra.AggregateOperator;
+import org.openrdf.query.algebra.Extension;
import org.openrdf.query.algebra.Filter;
+import org.openrdf.query.algebra.Group;
+import org.openrdf.query.algebra.GroupElem;
import org.openrdf.query.algebra.Join;
import org.openrdf.query.algebra.LeftJoin;
import org.openrdf.query.algebra.Projection;
import org.openrdf.query.algebra.QueryModelNode;
import org.openrdf.query.algebra.StatementPattern;
import org.openrdf.query.algebra.TupleExpr;
+import org.openrdf.query.algebra.Var;
import org.openrdf.query.algebra.helpers.QueryModelVisitorBase;
import org.openrdf.query.parser.ParsedQuery;
-import com.google.common.base.Optional;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+import net.jcip.annotations.Immutable;
+
/**
* Creates the {@link FluoQuery} metadata that is required by the Fluo
* application to process a SPARQL query.
@@ -119,7 +128,7 @@ public class SparqlFluoQueryBuilder {
*/
public Optional<String> getId(final QueryModelNode node) {
checkNotNull(node);
- return Optional.fromNullable( nodeIds.get(node) );
+ return Optional.ofNullable( nodeIds.get(node) );
}
/**
@@ -157,14 +166,15 @@ public class SparqlFluoQueryBuilder {
prefix = JOIN_PREFIX;
} else if(node instanceof Projection) {
prefix = QUERY_PREFIX;
+ } else if(node instanceof Extension) {
+ prefix = AGGREGATION_PREFIX;
} else {
- throw new IllegalArgumentException("Node must be of type {StatementPattern, Join, Filter, Projection} but was " + node.getClass());
+ throw new IllegalArgumentException("Node must be of type {StatementPattern, Join, Filter, Extension, Projection} but was " + node.getClass());
}
// Create the unique portion of the id.
final String unique = UUID.randomUUID().toString().replaceAll("-", "");
-
// Put them together to create the Node ID.
return prefix + "_" + unique;
}
@@ -204,6 +214,77 @@ public class SparqlFluoQueryBuilder {
this.nodeIds = checkNotNull(nodeIds);
}
+ /**
+ * If we encounter an Extension node that contains a Group, then we've found an aggregation.
+ */
+ @Override
+ public void meet(final Extension node) {
+ final TupleExpr arg = node.getArg();
+ if(arg instanceof Group) {
+ final Group group = (Group) arg;
+
+ // Get the Aggregation Node's id.
+ final String aggregationId = nodeIds.getOrMakeId(node);
+
+ // Get the group's child node id. This call forces it to be a supported child type.
+ final TupleExpr child = group.getArg();
+ final String childNodeId = nodeIds.getOrMakeId( child );
+
+ // Get the list of group by binding names.
+ VariableOrder groupByVariableOrder = null;
+ if(!group.getGroupBindingNames().isEmpty()) {
+ groupByVariableOrder = new VariableOrder(group.getGroupBindingNames());
+ } else {
+ groupByVariableOrder = new VariableOrder();
+ }
+
+ // The aggregations that need to be performed are the Group Elements.
+ final List<AggregationElement> aggregations = new ArrayList<>();
+ for(final GroupElem groupElem : group.getGroupElements()) {
+ // Figure out the type of the aggregation.
+ final AggregateOperator operator = groupElem.getOperator();
+ final Optional<AggregationType> type = AggregationType.byOperatorClass( operator.getClass() );
+
+ // If the type is one we support, create the AggregationElement.
+ if(type.isPresent()) {
+ final String resultBindingName = groupElem.getName();
+
+ final AtomicReference<String> aggregatedBindingName = new AtomicReference<>();
+ groupElem.visitChildren(new QueryModelVisitorBase<RuntimeException>() {
+ @Override
+ public void meet(final Var node) {
+ aggregatedBindingName.set( node.getName() );
+ }
+ });
+
+ aggregations.add( new AggregationElement(type.get(), aggregatedBindingName.get(), resultBindingName) );
+ }
+ }
+
+ // Update the aggregation's metadata.
+ AggregationMetadata.Builder aggregationBuilder = fluoQueryBuilder.getAggregateBuilder(aggregationId).orNull();
+ if(aggregationBuilder == null) {
+ aggregationBuilder = AggregationMetadata.builder(aggregationId);
+ fluoQueryBuilder.addAggregateMetadata(aggregationBuilder);
+ }
+
+ aggregationBuilder.setChildNodeId(childNodeId);
+ aggregationBuilder.setGroupByVariableOrder(groupByVariableOrder);
+ for(final AggregationElement aggregation : aggregations) {
+ aggregationBuilder.addAggregation(aggregation);
+ }
+
+ // Update the child node's metadata.
+ final Set<String> childVars = getVars(child);
+ final VariableOrder childVarOrder = new VariableOrder(childVars);
+
+ setChildMetadata(childNodeId, childVarOrder, aggregationId);
+ }
+
+ // Walk to the next node.
+ super.meet(node);
+ }
+
@Override
public void meet(final StatementPattern node) {
// Extract metadata that will be stored from the node.
@@ -386,10 +467,21 @@ public class SparqlFluoQueryBuilder {
filterBuilder.setParentNodeId(parentNodeId);
break;
- case QUERY:
- throw new IllegalArgumentException("QUERY nodes do not have children.");
- default:
- throw new IllegalArgumentException("Unsupported NodeType: " + childType);
+ case AGGREGATION:
+ AggregationMetadata.Builder aggregationBuilder = fluoQueryBuilder.getAggregateBuilder(childNodeId).orNull();
+ if(aggregationBuilder == null) {
+ aggregationBuilder = AggregationMetadata.builder(childNodeId);
+ fluoQueryBuilder.addAggregateMetadata(aggregationBuilder);
+ }
+
+ aggregationBuilder.setVariableOrder(childVarOrder);
+ aggregationBuilder.setParentNodeId(parentNodeId);
+ break;
+
+ case QUERY:
+ throw new IllegalArgumentException("QUERY nodes do not have children.");
+ default:
+ throw new IllegalArgumentException("Unsupported NodeType: " + childType);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/BindingSetUtil.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/BindingSetUtil.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/BindingSetUtil.java
new file mode 100644
index 0000000..30f026c
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/BindingSetUtil.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.util;
+
+import static java.util.Objects.requireNonNull;
+
+import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
+import org.openrdf.query.Binding;
+import org.openrdf.query.BindingSet;
+import org.openrdf.query.impl.MapBindingSet;
+
+/**
+ * A utility class that defines functions that make it easier to work with {@link BindingSet} objects.
+ */
+public class BindingSetUtil {
+
+ /**
+ * Create a new {@link BindingSet} that only includes the bindings whose names appear within the {@code variableOrder}.
+ * If no binding is found for a variable, then that binding is just omitted from the resulting object.
+ *
+ * @param variableOrder - Defines which bindings will be kept. (not null)
+ * @param bindingSet - Contains the source {@link Binding}s. (not null)
+ * @return A new {@link BindingSet} containing only the specified bindings.
+ */
+ public static BindingSet keepBindings(final VariableOrder variableOrder, final BindingSet bindingSet) {
+ requireNonNull(variableOrder);
+ requireNonNull(bindingSet);
+
+ final MapBindingSet result = new MapBindingSet();
+ for(final String bindingName : variableOrder) {
+ if(bindingSet.hasBinding(bindingName)) {
+ final Binding binding = bindingSet.getBinding(bindingName);
+ result.addBinding(binding);
+ }
+ }
+ return result;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/RowKeyUtil.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/RowKeyUtil.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/RowKeyUtil.java
new file mode 100644
index 0000000..ffb2320
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/RowKeyUtil.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.util;
+
+import static java.util.Objects.requireNonNull;
+
+import org.apache.fluo.api.data.Bytes;
+import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants;
+import org.apache.rya.indexing.pcj.storage.accumulo.BindingSetStringConverter;
+import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
+import org.openrdf.query.BindingSet;
+
+import com.google.common.base.Charsets;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * TODO doc that this implements utility functions used to create the Fluo Row Keys used when referencing the binding
+ * set results of a query node.
+ */
+@DefaultAnnotation(NonNull.class)
+public class RowKeyUtil {
+
+ private static final BindingSetStringConverter BS_CONVERTER = new BindingSetStringConverter();
+
+ /**
+ * Creates the Row Key that will be used by a node within the PCJ Fluo application to represent where a specific
+ * result of that node will be placed.
+ *
+ * @param nodeId - Identifies the Node that the Row Key is for. (not null)
+ * @param variableOrder - Specifies which bindings from {@code bindingSet} will be included within the Row Key as
+ * well as the order they will appear. (not null)
+ * @param bindingSet - The Binding Set whose values will be used to create the Row Key. (not null)
+ * @return A Row Key built using the provided values.
+ */
+ public static Bytes makeRowKey(final String nodeId, final VariableOrder variableOrder, final BindingSet bindingSet) {
+ requireNonNull(nodeId);
+ requireNonNull(variableOrder);
+ requireNonNull(bindingSet);
+
+ // The Row Key starts with the Node ID of the node the result belongs to.
+ String rowId = nodeId + IncrementalUpdateConstants.NODEID_BS_DELIM;
+
+ // Append the String formatted bindings that are included in the Variable Order. The Variable Order also defines
+ // the order the binding will be written to the Row Key. If a Binding is missing for one of the Binding Names
+ // that appears within the Variable Order, then an empty value will be written for that location within the Row Key.
+ rowId += BS_CONVERTER.convert(bindingSet, variableOrder);
+
+ // Format the Row Key as a UTF 8 encoded Bytes object.
+ return Bytes.of( rowId.getBytes(Charsets.UTF_8) );
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/VisibilityBindingSetSerDeTest.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/VisibilityBindingSetSerDeTest.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/VisibilityBindingSetSerDeTest.java
new file mode 100644
index 0000000..99791ee
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/VisibilityBindingSetSerDeTest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.rya.indexing.pcj.fluo.app;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.fluo.api.data.Bytes;
+import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
+import org.junit.Test;
+import org.openrdf.model.ValueFactory;
+import org.openrdf.model.impl.ValueFactoryImpl;
+import org.openrdf.query.impl.MapBindingSet;
+
+/**
+ * Tests the methods of {@link VisibilityBindingSetSerDe}.
+ */
+public class VisibilityBindingSetSerDeTest {
+
+ @Test
+ public void rountTrip() throws Exception {
+ final ValueFactory vf = new ValueFactoryImpl();
+
+ final MapBindingSet bs = new MapBindingSet();
+ bs.addBinding("name", vf.createLiteral("Alice"));
+ bs.addBinding("age", vf.createLiteral(5));
+ final VisibilityBindingSet original = new VisibilityBindingSet(bs, "u");
+
+ final VisibilityBindingSetSerDe serde = new VisibilityBindingSetSerDe();
+ final Bytes bytes = serde.serialize(original);
+ final VisibilityBindingSet result = serde.deserialize(bytes);
+
+ assertEquals(original, result);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/command/NewQueryCommand.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/command/NewQueryCommand.java b/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/command/NewQueryCommand.java
index 43dac3c..854798d 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/command/NewQueryCommand.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/command/NewQueryCommand.java
@@ -135,7 +135,7 @@ public class NewQueryCommand implements PcjAdminClientCommand {
// Tell the Fluo PCJ Updater app to maintain the PCJ.
createPcj.withRyaIntegration(pcjId, pcjStorage, fluo, accumulo, ryaTablePrefix);
- } catch (MalformedQueryException | SailException | QueryEvaluationException | PcjException | RyaDAOException e) {
+ } catch (MalformedQueryException | PcjException | RyaDAOException e) {
throw new ExecutionException("Could not create and load historic matches into the the Fluo app for the query.", e);
}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/rya.pcj.fluo/pcj.fluo.demo/src/main/java/org/apache/rya/indexing/pcj/fluo/demo/FluoAndHistoricPcjsDemo.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.demo/src/main/java/org/apache/rya/indexing/pcj/fluo/demo/FluoAndHistoricPcjsDemo.java b/extras/rya.pcj.fluo/pcj.fluo.demo/src/main/java/org/apache/rya/indexing/pcj/fluo/demo/FluoAndHistoricPcjsDemo.java
index 105f697..c8dc737 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.demo/src/main/java/org/apache/rya/indexing/pcj/fluo/demo/FluoAndHistoricPcjsDemo.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.demo/src/main/java/org/apache/rya/indexing/pcj/fluo/demo/FluoAndHistoricPcjsDemo.java
@@ -18,19 +18,29 @@
*/
package org.apache.rya.indexing.pcj.fluo.demo;
+import java.io.IOException;
import java.util.Set;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.minicluster.MiniAccumuloCluster;
import org.apache.commons.lang3.StringUtils;
+import org.apache.fluo.api.client.FluoClient;
+import org.apache.fluo.api.mini.MiniFluo;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
+import org.apache.rya.api.domain.RyaStatement;
+import org.apache.rya.api.domain.RyaType;
+import org.apache.rya.api.domain.RyaURI;
+import org.apache.rya.api.persist.RyaDAOException;
+import org.apache.rya.api.resolver.RyaToRdfConversions;
import org.apache.rya.indexing.pcj.fluo.api.CreatePcj;
import org.apache.rya.indexing.pcj.fluo.api.InsertTriples;
import org.apache.rya.indexing.pcj.storage.PcjException;
import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
+import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator;
import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.PCJStorageException;
import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage;
+import org.apache.rya.rdftriplestore.RyaSailRepository;
import org.openrdf.model.Statement;
import org.openrdf.query.BindingSet;
import org.openrdf.query.MalformedQueryException;
@@ -45,16 +55,6 @@ import org.openrdf.sail.SailException;
import com.google.common.base.Optional;
import com.google.common.collect.Sets;
-import org.apache.fluo.api.client.FluoClient;
-import org.apache.fluo.api.mini.MiniFluo;
-import org.apache.rya.accumulo.query.AccumuloRyaQueryEngine;
-import org.apache.rya.api.domain.RyaStatement;
-import org.apache.rya.api.domain.RyaType;
-import org.apache.rya.api.domain.RyaURI;
-import org.apache.rya.api.persist.RyaDAOException;
-import org.apache.rya.api.resolver.RyaToRdfConversions;
-import org.apache.rya.rdftriplestore.RyaSailRepository;
-
/**
* Demonstrates historicly added Rya statements that are stored within the core
* Rya tables joining with newly streamed statements into the Fluo application.
@@ -181,7 +181,7 @@ public class FluoAndHistoricPcjsDemo implements Demo {
// Tell the Fluo app to maintain it.
new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, accumuloConn, ryaTablePrefix);
- } catch (MalformedQueryException | SailException | QueryEvaluationException | PcjException | RyaDAOException e) {
+ } catch (MalformedQueryException | PcjException | RyaDAOException e) {
throw new DemoExecutionException("Error while using Fluo to compute and export historic matches, so the demo can not continue. Exiting.", e);
}
@@ -192,11 +192,11 @@ public class FluoAndHistoricPcjsDemo implements Demo {
// 5. Show that the Fluo app exported the results to the PCJ table in Accumulo.
log.info("The following Binding Sets were exported to the PCJ with ID '" + pcjId + "' in Rya:");
- try {
- for(final BindingSet result : pcjStorage.listResults(pcjId)) {
- log.info(" " + result);
+ try(CloseableIterator<BindingSet> resultsIt = pcjStorage.listResults(pcjId)) {
+ while(resultsIt.hasNext()) {
+ log.info(" " + resultsIt.next());
}
- } catch (final PCJStorageException e) {
+ } catch (final Exception e) {
throw new DemoExecutionException("Could not fetch the PCJ's reuslts from Accumulo. Exiting.", e);
}
waitForEnter();
@@ -257,11 +257,11 @@ public class FluoAndHistoricPcjsDemo implements Demo {
// 8. Show the new results have been exported to the PCJ table in Accumulo.
log.info("The following Binding Sets were exported to the PCJ with ID '" + pcjId + "' in Rya:");
- try {
- for(final BindingSet result : pcjStorage.listResults(pcjId)) {
- log.info(" " + result);
+ try(CloseableIterator<BindingSet> resultsIt = pcjStorage.listResults(pcjId)) {
+ while(resultsIt.hasNext()) {
+ log.info(" " + resultsIt.next());
}
- } catch (final PCJStorageException e) {
+ } catch (final Exception e) {
throw new DemoExecutionException("Could not fetch the PCJ's reuslts from Accumulo. Exiting.", e);
}
log.info("");
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/rya.pcj.fluo/pcj.fluo.integration/pom.xml
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/pom.xml b/extras/rya.pcj.fluo/pcj.fluo.integration/pom.xml
index ab99ecd..9263362 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.integration/pom.xml
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/pom.xml
@@ -41,6 +41,11 @@
<groupId>org.apache.rya</groupId>
<artifactId>rya.indexing</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.fluo</groupId>
+ <artifactId>fluo-api</artifactId>
+ </dependency>
+
<!-- Testing dependencies. -->
<dependency>
<groupId>org.apache.fluo</groupId>
@@ -86,8 +91,10 @@
</exclusion>
</exclusions>
</dependency>
-
-
-
+ <dependency>
+ <groupId>org.apache.fluo</groupId>
+ <artifactId>fluo-recipes-test</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/ITBase.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/ITBase.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/ITBase.java
deleted file mode 100644
index 6e696c8..0000000
--- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/ITBase.java
+++ /dev/null
@@ -1,443 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.rya.indexing.pcj.fluo;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Set;
-
-import org.apache.accumulo.core.client.AccumuloException;
-import org.apache.accumulo.core.client.AccumuloSecurityException;
-import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.Instance;
-import org.apache.accumulo.core.client.ZooKeeperInstance;
-import org.apache.accumulo.core.client.security.tokens.PasswordToken;
-import org.apache.accumulo.minicluster.MiniAccumuloCluster;
-import org.apache.accumulo.minicluster.MiniAccumuloConfig;
-import org.apache.fluo.api.client.FluoAdmin;
-import org.apache.fluo.api.client.FluoAdmin.AlreadyInitializedException;
-import org.apache.fluo.api.client.FluoAdmin.TableExistsException;
-import org.apache.fluo.api.client.FluoClient;
-import org.apache.fluo.api.client.FluoFactory;
-import org.apache.fluo.api.client.Snapshot;
-import org.apache.fluo.api.client.scanner.CellScanner;
-import org.apache.fluo.api.config.FluoConfiguration;
-import org.apache.fluo.api.config.ObserverSpecification;
-import org.apache.fluo.api.data.Bytes;
-import org.apache.fluo.api.data.RowColumnValue;
-import org.apache.fluo.api.mini.MiniFluo;
-import org.apache.log4j.Level;
-import org.apache.log4j.Logger;
-import org.apache.rya.accumulo.AccumuloRdfConfiguration;
-import org.apache.rya.api.client.Install.InstallConfiguration;
-import org.apache.rya.api.client.RyaClient;
-import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails;
-import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory;
-import org.apache.rya.api.domain.RyaStatement;
-import org.apache.rya.api.domain.RyaStatement.RyaStatementBuilder;
-import org.apache.rya.api.domain.RyaType;
-import org.apache.rya.api.domain.RyaURI;
-import org.apache.rya.api.resolver.RyaToRdfConversions;
-import org.apache.rya.indexing.accumulo.ConfigUtils;
-import org.apache.rya.indexing.external.PrecomputedJoinIndexerConfig;
-import org.apache.rya.indexing.pcj.fluo.app.export.rya.RyaExportParameters;
-import org.apache.rya.indexing.pcj.fluo.app.observers.FilterObserver;
-import org.apache.rya.indexing.pcj.fluo.app.observers.JoinObserver;
-import org.apache.rya.indexing.pcj.fluo.app.observers.QueryResultObserver;
-import org.apache.rya.indexing.pcj.fluo.app.observers.StatementPatternObserver;
-import org.apache.rya.indexing.pcj.fluo.app.observers.TripleObserver;
-import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
-import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO;
-import org.apache.rya.indexing.pcj.fluo.app.query.QueryMetadata;
-import org.apache.rya.indexing.pcj.storage.accumulo.BindingSetStringConverter;
-import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
-import org.apache.rya.rdftriplestore.RyaSailRepository;
-import org.apache.rya.sail.config.RyaSailFactory;
-import org.apache.zookeeper.ClientCnxn;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.openrdf.model.Statement;
-import org.openrdf.model.vocabulary.XMLSchema;
-import org.openrdf.query.Binding;
-import org.openrdf.query.BindingSet;
-import org.openrdf.query.impl.MapBindingSet;
-import org.openrdf.repository.RepositoryConnection;
-import org.openrdf.sail.Sail;
-
-import com.google.common.io.Files;
-
-import org.apache.rya.api.RdfCloudTripleStoreConfiguration;
-
-/**
- * Integration tests that ensure the Fluo application processes PCJs results
- * correctly.
- * <p>
- * This class is being ignored because it doesn't contain any unit tests.
- */
-public abstract class ITBase {
- private static final Logger log = Logger.getLogger(ITBase.class);
-
- // Rya data store and connections.
- protected static final String RYA_INSTANCE_NAME = "demo_";
- protected RyaSailRepository ryaRepo = null;
- protected RepositoryConnection ryaConn = null;
-
- // Mini Accumulo Cluster
- protected static final String ACCUMULO_USER = "root";
- protected static final String ACCUMULO_PASSWORD = "password";
- protected MiniAccumuloCluster cluster;
- protected static Connector accumuloConn = null;
- protected String instanceName = null;
- protected String zookeepers = null;
-
- // Fluo data store and connections.
- protected static final String FLUO_APP_NAME = "IntegrationTests";
- protected MiniFluo fluo = null;
- protected FluoClient fluoClient = null;
-
- @BeforeClass
- public static void killLoudLogs() {
- Logger.getRootLogger().setLevel(Level.ERROR);
- Logger.getLogger(ClientCnxn.class).setLevel(Level.OFF);
- }
-
- @Before
- public void setupMiniResources() throws Exception {
- // Will set defaults for log4J
- org.apache.log4j.BasicConfigurator.configure();
- // Initialize the Mini Accumulo that will be used to host Rya and Fluo.
- setupMiniAccumulo();
-
- // Initialize the Mini Fluo that will be used to store created queries.
- fluo = startMiniFluo();
- fluoClient = FluoFactory.newClient(fluo.getClientConfiguration());
-
- // Initialize the Rya that will be used by the tests.
- ryaRepo = setupRya(instanceName, zookeepers);
- ryaConn = ryaRepo.getConnection();
- }
-
- @After
- public void shutdownMiniResources() {
- if (ryaConn != null) {
- try {
- log.info("Shutting down Rya Connection.");
- ryaConn.close();
- log.info("Rya Connection shut down.");
- } catch (final Exception e) {
- log.error("Could not shut down the Rya Connection.", e);
- }
- }
-
- if (ryaRepo != null) {
- try {
- log.info("Shutting down Rya Repo.");
- ryaRepo.shutDown();
- log.info("Rya Repo shut down.");
- } catch (final Exception e) {
- log.error("Could not shut down the Rya Repo.", e);
- }
- }
-
- if (fluoClient != null) {
- try {
- log.info("Shutting down Fluo Client.");
- fluoClient.close();
- log.info("Fluo Client shut down.");
- } catch (final Exception e) {
- log.error("Could not shut down the Fluo Client.", e);
- }
- }
-
- if (fluo != null) {
- try {
- log.info("Shutting down Mini Fluo.");
- fluo.close();
- log.info("Mini Fluo shut down.");
- } catch (final Exception e) {
- log.error("Could not shut down the Mini Fluo.", e);
- }
- }
-
- if(cluster != null) {
- try {
- log.info("Shutting down the Mini Accumulo being used as a Rya store.");
- cluster.stop();
- log.info("Mini Accumulo being used as a Rya store shut down.");
- } catch(final Exception e) {
- log.error("Could not shut down the Mini Accumulo.", e);
- }
- }
- }
-
- /**
- * A helper fuction for creating a {@link BindingSet} from an array of
- * {@link Binding}s.
- *
- * @param bindings - The bindings to include in the set. (not null)
- * @return A {@link BindingSet} holding the bindings.
- */
- protected static BindingSet makeBindingSet(final Binding... bindings) {
- final MapBindingSet bindingSet = new MapBindingSet();
- for (final Binding binding : bindings) {
- bindingSet.addBinding(binding);
- }
- return bindingSet;
- }
-
- /**
- * A helper function for creating a {@link RyaStatement} that represents a
- * Triple.
- *
- * @param subject - The Subject of the Triple. (not null)
- * @param predicate - The Predicate of the Triple. (not null)
- * @param object - The Object of the Triple. (not null)
- * @return A Triple as a {@link RyaStatement}.
- */
- protected static RyaStatement makeRyaStatement(final String subject, final String predicate, final String object) {
- checkNotNull(subject);
- checkNotNull(predicate);
- checkNotNull(object);
-
- final RyaStatementBuilder builder = RyaStatement.builder().setSubject(new RyaURI(subject))
- .setPredicate(new RyaURI(predicate));
-
- if (object.startsWith("http://") || object.startsWith("tag:")) {
- builder.setObject(new RyaURI(object));
- } else {
- builder.setObject(new RyaType(object));
- }
-
- return builder.build();
- }
-
- /**
- * A helper function for creating a {@link RyaStatement} that represents a Triple.
- * This overload takes a typed literal for the object. Prepare it like this for example specify the type (wktLiteral) and the value (Point...):
- * makeRyaStatement(s, p, new RyaType(new URIImpl("http://www.opengis.net/ont/geosparql#wktLiteral"), "Point(-77.03524 38.889468)")) //
- *
- * @param subject - The Subject of the Triple. (not null)
- * @param predicate - The Predicate of the Triple. (not null)
- * @param object - The Object of the Triple. (not null)
- * @return A Triple as a {@link RyaStatement}.
- */
- protected static RyaStatement makeRyaStatement(final String subject, final String predicate, final RyaType object) {
- checkNotNull(subject);
- checkNotNull(predicate);
- checkNotNull(object);
-
- final RyaStatementBuilder builder = RyaStatement.builder()//
- .setSubject(new RyaURI(subject))//
- .setPredicate(new RyaURI(predicate))//
- .setObject(object);
- return builder.build();
- }
-
- /**
- * A helper function for creating a {@link RyaStatement} that represents a Triple with an integer.
- *
- * @param subject - The Subject of the Triple. (not null)
- * @param predicate - The Predicate of the Triple. (not null)
- * @param object - The Object of the Triple, an integer value (int).
- * @return A Triple as a {@link RyaStatement}.
- */
- protected static RyaStatement makeRyaStatement(final String subject, final String predicate, final int object) {
- checkNotNull(subject);
- checkNotNull(predicate);
-
- return RyaStatement.builder().setSubject(new RyaURI(subject)).setPredicate(new RyaURI(predicate))
- .setObject(new RyaType(XMLSchema.INT, "" + object)).build();
- }
-
- /**
- * A helper function for creating a Sesame {@link Statement} that represents
- * a Triple..
- *
- * @param subject - The Subject of the Triple. (not null)
- * @param predicate - The Predicate of the Triple. (not null)
- * @param object - The Object of the Triple. (not null)
- * @return A Triple as a {@link Statement}.
- */
- protected static Statement makeStatement(final String subject, final String predicate, final String object) {
- checkNotNull(subject);
- checkNotNull(predicate);
- checkNotNull(object);
-
- final RyaStatement ryaStmt = makeRyaStatement(subject, predicate, object);
- return RyaToRdfConversions.convertStatement(ryaStmt);
- }
-
- /**
- * Fetches the binding sets that are the results of a specific SPARQL query from the Fluo table.
- *
- * @param fluoClient- A connection to the Fluo table where the results reside. (not null)
- * @param sparql - This query's results will be fetched. (not null)
- * @return The binding sets for the query's results.
- */
- protected static Set<BindingSet> getQueryBindingSetValues(final FluoClient fluoClient, final String sparql) {
- final Set<BindingSet> bindingSets = new HashSet<>();
-
- try (Snapshot snapshot = fluoClient.newSnapshot()) {
- final String queryId = snapshot.get(Bytes.of(sparql), FluoQueryColumns.QUERY_ID).toString();
-
- // Fetch the query's variable order.
- final QueryMetadata queryMetadata = new FluoQueryMetadataDAO().readQueryMetadata(snapshot, queryId);
- final VariableOrder varOrder = queryMetadata.getVariableOrder();
-
- CellScanner cellScanner = snapshot.scanner().fetch(FluoQueryColumns.QUERY_BINDING_SET).build();
- final BindingSetStringConverter converter = new BindingSetStringConverter();
-
- Iterator<RowColumnValue> iter = cellScanner.iterator();
-
- while (iter.hasNext()) {
- final String bindingSetString = iter.next().getsValue();
- final BindingSet bindingSet = converter.convert(bindingSetString, varOrder);
- bindingSets.add(bindingSet);
- }
- }
-
- return bindingSets;
- }
-
- private void setupMiniAccumulo() throws IOException, InterruptedException, AccumuloException, AccumuloSecurityException {
- final File miniDataDir = Files.createTempDir();
-
- // Setup and start the Mini Accumulo.
- final MiniAccumuloConfig cfg = new MiniAccumuloConfig(miniDataDir, ACCUMULO_PASSWORD);
- cluster = new MiniAccumuloCluster(cfg);
- cluster.start();
-
- // Store a connector to the Mini Accumulo.
- instanceName = cluster.getInstanceName();
- zookeepers = cluster.getZooKeepers();
-
- final Instance instance = new ZooKeeperInstance(instanceName, zookeepers);
- accumuloConn = instance.getConnector(ACCUMULO_USER, new PasswordToken(ACCUMULO_PASSWORD));
- }
-
- /**
- * Sets up a Rya instance.
- */
- protected static RyaSailRepository setupRya(final String instanceName, final String zookeepers) throws Exception {
- checkNotNull(instanceName);
- checkNotNull(zookeepers);
-
- // Install the Rya instance to the mini accumulo cluster.
- final RyaClient ryaClient = AccumuloRyaClientFactory.build(new AccumuloConnectionDetails(
- ACCUMULO_USER,
- ACCUMULO_PASSWORD.toCharArray(),
- instanceName,
- zookeepers), accumuloConn);
-
- ryaClient.getInstall().install(RYA_INSTANCE_NAME, InstallConfiguration.builder()
- .setEnableTableHashPrefix(false)
- .setEnableFreeTextIndex(true)
- .setEnableEntityCentricIndex(true)
- .setEnableGeoIndex(true)
- .setEnableTemporalIndex(true)
- .setEnablePcjIndex(true)
- .setFluoPcjAppName(FLUO_APP_NAME)
- .build());
-
- // Connect to the Rya instance that was just installed.
- final AccumuloRdfConfiguration conf = makeConfig(instanceName, zookeepers);
- final Sail sail = RyaSailFactory.getInstance(conf);
- final RyaSailRepository ryaRepo = new RyaSailRepository(sail);
- return ryaRepo;
- }
-
- protected static AccumuloRdfConfiguration makeConfig(final String instanceName, final String zookeepers) {
- final AccumuloRdfConfiguration conf = new AccumuloRdfConfiguration();
- conf.setTablePrefix(RYA_INSTANCE_NAME);
- // Accumulo connection information.
- conf.set(ConfigUtils.CLOUDBASE_USER, ACCUMULO_USER);
- conf.set(ConfigUtils.CLOUDBASE_PASSWORD, ACCUMULO_PASSWORD);
- conf.set(ConfigUtils.CLOUDBASE_INSTANCE, instanceName);
- conf.set(ConfigUtils.CLOUDBASE_ZOOKEEPERS, zookeepers);
- conf.set(RdfCloudTripleStoreConfiguration.CONF_QUERY_AUTH, "");
- // PCJ configuration information.
- conf.set(ConfigUtils.USE_PCJ, "true");
- conf.set(ConfigUtils.USE_PCJ_UPDATER_INDEX, "true");
- conf.set(ConfigUtils.FLUO_APP_NAME, FLUO_APP_NAME);
- conf.set(ConfigUtils.PCJ_STORAGE_TYPE,
- PrecomputedJoinIndexerConfig.PrecomputedJoinStorageType.ACCUMULO.toString());
- conf.set(ConfigUtils.PCJ_UPDATER_TYPE,
- PrecomputedJoinIndexerConfig.PrecomputedJoinUpdaterType.FLUO.toString());
-
- return conf;
- }
-
- protected MiniFluo startMiniFluo() throws AlreadyInitializedException, TableExistsException {
- // Setup the observers that will be used by the Fluo PCJ Application.
- final List<ObserverSpecification> observers = new ArrayList<>();
- observers.add(new ObserverSpecification(TripleObserver.class.getName()));
- observers.add(new ObserverSpecification(StatementPatternObserver.class.getName()));
- observers.add(new ObserverSpecification(JoinObserver.class.getName()));
- observers.add(new ObserverSpecification(FilterObserver.class.getName()));
-
- // Set export details for exporting from Fluo to a Rya repository and a subscriber queue.
- final HashMap<String, String> exportParams = new HashMap<>();
- setExportParameters(exportParams);
-
- // Configure the export observer to export new PCJ results to the mini accumulo cluster.
- final ObserverSpecification exportObserverConfig = new ObserverSpecification(QueryResultObserver.class.getName(), exportParams);
- observers.add(exportObserverConfig);
-
- // Configure how the mini fluo will run.
- final FluoConfiguration config = new FluoConfiguration();
- config.setMiniStartAccumulo(false);
- config.setAccumuloInstance(instanceName);
- config.setAccumuloUser(ACCUMULO_USER);
- config.setAccumuloPassword(ACCUMULO_PASSWORD);
- config.setInstanceZookeepers(zookeepers + "/fluo");
- config.setAccumuloZookeepers(zookeepers);
-
- config.setApplicationName(FLUO_APP_NAME);
- config.setAccumuloTable("fluo" + FLUO_APP_NAME);
-
- config.addObservers(observers);
-
- FluoFactory.newAdmin(config).initialize(new FluoAdmin.InitializationOptions().setClearTable(true).setClearZookeeper(true) );
- return FluoFactory.newMiniFluo(config);
- }
-
- /**
- * Set export details for exporting from Fluo to a Rya repository and a subscriber queue.
- * Override this if you have custom export destinations.
- *
- * @param exportParams
- */
- protected void setExportParameters(final HashMap<String, String> exportParams) {
- final RyaExportParameters ryaParams = new RyaExportParameters(exportParams);
- ryaParams.setExportToRya(true);
- ryaParams.setRyaInstanceName(RYA_INSTANCE_NAME);
- ryaParams.setAccumuloInstanceName(instanceName);
- ryaParams.setZookeeperServers(zookeepers);
- ryaParams.setExporterUsername(ITBase.ACCUMULO_USER);
- ryaParams.setExporterPassword(ITBase.ACCUMULO_PASSWORD);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/KafkaExportITBase.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/KafkaExportITBase.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/KafkaExportITBase.java
new file mode 100644
index 0000000..cd84cb9
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/KafkaExportITBase.java
@@ -0,0 +1,315 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo;
+
+import static org.junit.Assert.assertEquals;
+
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+
+import org.I0Itec.zkclient.ZkClient;
+import org.apache.accumulo.minicluster.MiniAccumuloCluster;
+import org.apache.fluo.api.config.ObserverSpecification;
+import org.apache.fluo.recipes.test.AccumuloExportITBase;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.rya.accumulo.AccumuloRdfConfiguration;
+import org.apache.rya.api.client.Install.InstallConfiguration;
+import org.apache.rya.api.client.RyaClient;
+import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails;
+import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory;
+import org.apache.rya.indexing.accumulo.ConfigUtils;
+import org.apache.rya.indexing.external.PrecomputedJoinIndexerConfig;
+import org.apache.rya.indexing.pcj.fluo.app.export.kafka.KafkaExportParameters;
+import org.apache.rya.indexing.pcj.fluo.app.observers.AggregationObserver;
+import org.apache.rya.indexing.pcj.fluo.app.observers.FilterObserver;
+import org.apache.rya.indexing.pcj.fluo.app.observers.JoinObserver;
+import org.apache.rya.indexing.pcj.fluo.app.observers.QueryResultObserver;
+import org.apache.rya.indexing.pcj.fluo.app.observers.StatementPatternObserver;
+import org.apache.rya.indexing.pcj.fluo.app.observers.TripleObserver;
+import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
+import org.apache.rya.rdftriplestore.RyaSailRepository;
+import org.apache.rya.sail.config.RyaSailFactory;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.openrdf.sail.Sail;
+
+import kafka.admin.AdminUtils;
+import kafka.admin.RackAwareMode;
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaServer;
+import kafka.utils.MockTime;
+import kafka.utils.TestUtils;
+import kafka.utils.Time;
+import kafka.utils.ZKStringSerializer$;
+import kafka.utils.ZkUtils;
+import kafka.zk.EmbeddedZookeeper;
+
+/**
+ * The base Integration Test class used for Fluo applications that export to a Kakfa topic.
+ */
+public class KafkaExportITBase extends AccumuloExportITBase {
+
+ protected static final String RYA_INSTANCE_NAME = "test_";
+
+ private static final String ZKHOST = "127.0.0.1";
+ private static final String BROKERHOST = "127.0.0.1";
+ private static final String BROKERPORT = "9092";
+ private ZkUtils zkUtils;
+ private KafkaServer kafkaServer;
+ private EmbeddedZookeeper zkServer;
+ private ZkClient zkClient;
+
+ // The Rya instance statements are written to that will be fed into the Fluo app.
+ private RyaSailRepository ryaSailRepo = null;
+
+ /**
+ * Add info about the Kafka queue/topic to receive the export.
+ *
+ * @see org.apache.rya.indexing.pcj.fluo.ITBase#setExportParameters(java.util.HashMap)
+ */
+ @Override
+ protected void preFluoInitHook() throws Exception {
+ // Setup the observers that will be used by the Fluo PCJ Application.
+ final List<ObserverSpecification> observers = new ArrayList<>();
+ observers.add(new ObserverSpecification(TripleObserver.class.getName()));
+ observers.add(new ObserverSpecification(StatementPatternObserver.class.getName()));
+ observers.add(new ObserverSpecification(JoinObserver.class.getName()));
+ observers.add(new ObserverSpecification(FilterObserver.class.getName()));
+ observers.add(new ObserverSpecification(AggregationObserver.class.getName()));
+
+ // Configure the export observer to export new PCJ results to the mini accumulo cluster.
+ final HashMap<String, String> exportParams = new HashMap<>();
+
+ final KafkaExportParameters kafkaParams = new KafkaExportParameters(exportParams);
+ kafkaParams.setExportToKafka(true);
+
+ // Configure the Kafka Producer
+ final Properties producerConfig = new Properties();
+ producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKERHOST + ":" + BROKERPORT);
+ producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
+ producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.rya.indexing.pcj.fluo.app.export.kafka.KryoVisibilityBindingSetSerializer");
+ kafkaParams.addAllProducerConfig(producerConfig);
+
+ final ObserverSpecification exportObserverConfig = new ObserverSpecification(QueryResultObserver.class.getName(), exportParams);
+ observers.add(exportObserverConfig);
+
+ // Add the observers to the Fluo Configuration.
+ super.getFluoConfiguration().addObservers(observers);
+ }
+
+ /**
+ * setup mini kafka and call the super to setup mini fluo
+ *
+ * @see org.apache.rya.indexing.pcj.fluo.ITBase#setupMiniResources()
+ */
+ @Before
+ public void setupKafka() throws Exception {
+ // Install an instance of Rya on the Accumulo cluster.
+ installRyaInstance();
+
+ // Setup Kafka.
+ zkServer = new EmbeddedZookeeper();
+ final String zkConnect = ZKHOST + ":" + zkServer.port();
+ zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer$.MODULE$);
+ zkUtils = ZkUtils.apply(zkClient, false);
+
+ // setup Broker
+ final Properties brokerProps = new Properties();
+ brokerProps.setProperty("zookeeper.connect", zkConnect);
+ brokerProps.setProperty("broker.id", "0");
+ brokerProps.setProperty("log.dirs", Files.createTempDirectory("kafka-").toAbsolutePath().toString());
+ brokerProps.setProperty("listeners", "PLAINTEXT://" + BROKERHOST + ":" + BROKERPORT);
+ final KafkaConfig config = new KafkaConfig(brokerProps);
+ final Time mock = new MockTime();
+ kafkaServer = TestUtils.createServer(config, mock);
+ }
+
+ @After
+ public void teardownRya() throws Exception {
+ final MiniAccumuloCluster cluster = super.getMiniAccumuloCluster();
+ final String instanceName = cluster.getInstanceName();
+ final String zookeepers = cluster.getZooKeepers();
+
+ // Uninstall the instance of Rya.
+ final RyaClient ryaClient = AccumuloRyaClientFactory.build(
+ new AccumuloConnectionDetails(
+ ACCUMULO_USER,
+ ACCUMULO_PASSWORD.toCharArray(),
+ instanceName,
+ zookeepers),
+ super.getAccumuloConnector());
+
+ ryaClient.getUninstall().uninstall(RYA_INSTANCE_NAME);
+
+ // Shutdown the repo.
+ ryaSailRepo.shutDown();
+ }
+
+ private void installRyaInstance() throws Exception {
+ final MiniAccumuloCluster cluster = super.getMiniAccumuloCluster();
+ final String instanceName = cluster.getInstanceName();
+ final String zookeepers = cluster.getZooKeepers();
+
+ // Install the Rya instance to the mini accumulo cluster.
+ final RyaClient ryaClient = AccumuloRyaClientFactory.build(
+ new AccumuloConnectionDetails(
+ ACCUMULO_USER,
+ ACCUMULO_PASSWORD.toCharArray(),
+ instanceName,
+ zookeepers),
+ super.getAccumuloConnector());
+
+ ryaClient.getInstall().install(RYA_INSTANCE_NAME, InstallConfiguration.builder()
+ .setEnableTableHashPrefix(false)
+ .setEnableFreeTextIndex(false)
+ .setEnableEntityCentricIndex(false)
+ .setEnableGeoIndex(false)
+ .setEnableTemporalIndex(false)
+ .setEnablePcjIndex(true)
+ .setFluoPcjAppName( super.getFluoConfiguration().getApplicationName() )
+ .build());
+
+ // Connect to the Rya instance that was just installed.
+ final AccumuloRdfConfiguration conf = makeConfig(instanceName, zookeepers);
+ final Sail sail = RyaSailFactory.getInstance(conf);
+ ryaSailRepo = new RyaSailRepository(sail);
+ }
+
+ protected AccumuloRdfConfiguration makeConfig(final String instanceName, final String zookeepers) {
+ final AccumuloRdfConfiguration conf = new AccumuloRdfConfiguration();
+ conf.setTablePrefix(RYA_INSTANCE_NAME);
+
+ // Accumulo connection information.
+ conf.setAccumuloUser(AccumuloExportITBase.ACCUMULO_USER);
+ conf.setAccumuloPassword(AccumuloExportITBase.ACCUMULO_PASSWORD);
+ conf.setAccumuloInstance(super.getAccumuloConnector().getInstance().getInstanceName());
+ conf.setAccumuloZookeepers(super.getAccumuloConnector().getInstance().getZooKeepers());
+ conf.setAuths("");
+
+
+ // PCJ configuration information.
+ conf.set(ConfigUtils.USE_PCJ, "true");
+ conf.set(ConfigUtils.USE_PCJ_UPDATER_INDEX, "true");
+ conf.set(ConfigUtils.FLUO_APP_NAME, super.getFluoConfiguration().getApplicationName());
+ conf.set(ConfigUtils.PCJ_STORAGE_TYPE,
+ PrecomputedJoinIndexerConfig.PrecomputedJoinStorageType.ACCUMULO.toString());
+ conf.set(ConfigUtils.PCJ_UPDATER_TYPE,
+ PrecomputedJoinIndexerConfig.PrecomputedJoinUpdaterType.FLUO.toString());
+
+ conf.setDisplayQueryPlan(true);
+
+ return conf;
+ }
+
+ /**
+ * @return A {@link RyaSailRepository} that is connected to the Rya instance that statements are loaded into.
+ */
+ protected RyaSailRepository getRyaSailRepository() throws Exception {
+ return ryaSailRepo;
+ }
+
+ /**
+ * Close all the Kafka mini server and mini-zookeeper
+ *
+ * @see org.apache.rya.indexing.pcj.fluo.ITBase#shutdownMiniResources()
+ */
+ @After
+ public void teardownKafka() {
+ kafkaServer.shutdown();
+ zkClient.close();
+ zkServer.shutdown();
+ }
+
+ /**
+ * Test kafka without rya code to make sure kafka works in this environment.
+ * If this test fails then its a testing environment issue, not with Rya.
+ * Source: https://github.com/asmaier/mini-kafka
+ */
+ @Test
+ public void embeddedKafkaTest() throws Exception {
+ // create topic
+ final String topic = "testTopic";
+ AdminUtils.createTopic(zkUtils, topic, 1, 1, new Properties(), RackAwareMode.Disabled$.MODULE$);
+
+ // setup producer
+ final Properties producerProps = new Properties();
+ producerProps.setProperty("bootstrap.servers", BROKERHOST + ":" + BROKERPORT);
+ producerProps.setProperty("key.serializer","org.apache.kafka.common.serialization.IntegerSerializer");
+ producerProps.setProperty("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
+ final KafkaProducer<Integer, byte[]> producer = new KafkaProducer<>(producerProps);
+
+ // setup consumer
+ final Properties consumerProps = new Properties();
+ consumerProps.setProperty("bootstrap.servers", BROKERHOST + ":" + BROKERPORT);
+ consumerProps.setProperty("group.id", "group0");
+ consumerProps.setProperty("client.id", "consumer0");
+ consumerProps.setProperty("key.deserializer","org.apache.kafka.common.serialization.IntegerDeserializer");
+ consumerProps.setProperty("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
+
+ // to make sure the consumer starts from the beginning of the topic
+ consumerProps.put("auto.offset.reset", "earliest");
+
+ final KafkaConsumer<Integer, byte[]> consumer = new KafkaConsumer<>(consumerProps);
+ consumer.subscribe(Arrays.asList(topic));
+
+ // send message
+ final ProducerRecord<Integer, byte[]> data = new ProducerRecord<>(topic, 42, "test-message".getBytes(StandardCharsets.UTF_8));
+ producer.send(data);
+ producer.close();
+
+ // starting consumer
+ final ConsumerRecords<Integer, byte[]> records = consumer.poll(3000);
+ assertEquals(1, records.count());
+ final Iterator<ConsumerRecord<Integer, byte[]>> recordIterator = records.iterator();
+ final ConsumerRecord<Integer, byte[]> record = recordIterator.next();
+ assertEquals(42, (int) record.key());
+ assertEquals("test-message", new String(record.value(), StandardCharsets.UTF_8));
+ consumer.close();
+ }
+
+ protected KafkaConsumer<Integer, VisibilityBindingSet> makeConsumer(final String TopicName) {
+ // setup consumer
+ final Properties consumerProps = new Properties();
+ consumerProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKERHOST + ":" + BROKERPORT);
+ consumerProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group0");
+ consumerProps.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "consumer0");
+ consumerProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerDeserializer");
+ consumerProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.rya.indexing.pcj.fluo.app.export.kafka.KryoVisibilityBindingSetSerializer");
+
+ // to make sure the consumer starts from the beginning of the topic
+ consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+
+ final KafkaConsumer<Integer, VisibilityBindingSet> consumer = new KafkaConsumer<>(consumerProps);
+ consumer.subscribe(Arrays.asList(TopicName));
+ return consumer;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/RyaExportITBase.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/RyaExportITBase.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/RyaExportITBase.java
new file mode 100644
index 0000000..5fe999f
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/RyaExportITBase.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+import org.apache.accumulo.minicluster.MiniAccumuloCluster;
+import org.apache.fluo.api.config.ObserverSpecification;
+import org.apache.fluo.recipes.test.AccumuloExportITBase;
+import org.apache.log4j.BasicConfigurator;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.rya.accumulo.AccumuloRdfConfiguration;
+import org.apache.rya.api.client.Install.InstallConfiguration;
+import org.apache.rya.api.client.RyaClient;
+import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails;
+import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory;
+import org.apache.rya.indexing.accumulo.ConfigUtils;
+import org.apache.rya.indexing.external.PrecomputedJoinIndexerConfig;
+import org.apache.rya.indexing.pcj.fluo.app.export.rya.RyaExportParameters;
+import org.apache.rya.indexing.pcj.fluo.app.observers.AggregationObserver;
+import org.apache.rya.indexing.pcj.fluo.app.observers.FilterObserver;
+import org.apache.rya.indexing.pcj.fluo.app.observers.JoinObserver;
+import org.apache.rya.indexing.pcj.fluo.app.observers.QueryResultObserver;
+import org.apache.rya.indexing.pcj.fluo.app.observers.StatementPatternObserver;
+import org.apache.rya.indexing.pcj.fluo.app.observers.TripleObserver;
+import org.apache.rya.rdftriplestore.RyaSailRepository;
+import org.apache.rya.sail.config.RyaSailFactory;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.openrdf.sail.Sail;
+
+/**
+ * The base Integration Test class used for Fluo applications that export to a Rya PCJ Index.
+ */
+public class RyaExportITBase extends AccumuloExportITBase {
+
+ protected static final String RYA_INSTANCE_NAME = "test_";
+
+ private RyaSailRepository ryaSailRepo = null;
+
+ public RyaExportITBase() {
+ // Indicates that MiniFluo should be started before each test.
+ super(true);
+ }
+
+ @BeforeClass
+ public static void setupLogging() {
+ BasicConfigurator.configure();
+ Logger.getRootLogger().setLevel(Level.ERROR);
+ }
+
+ @Override
+ protected void preFluoInitHook() throws Exception {
+ // Setup the observers that will be used by the Fluo PCJ Application.
+ final List<ObserverSpecification> observers = new ArrayList<>();
+ observers.add(new ObserverSpecification(TripleObserver.class.getName()));
+ observers.add(new ObserverSpecification(StatementPatternObserver.class.getName()));
+ observers.add(new ObserverSpecification(JoinObserver.class.getName()));
+ observers.add(new ObserverSpecification(FilterObserver.class.getName()));
+ observers.add(new ObserverSpecification(AggregationObserver.class.getName()));
+
+ // Configure the export observer to export new PCJ results to the mini accumulo cluster.
+ final HashMap<String, String> exportParams = new HashMap<>();
+ final RyaExportParameters ryaParams = new RyaExportParameters(exportParams);
+ ryaParams.setExportToRya(true);
+ ryaParams.setRyaInstanceName(RYA_INSTANCE_NAME);
+ ryaParams.setAccumuloInstanceName(super.getMiniAccumuloCluster().getInstanceName());
+ ryaParams.setZookeeperServers(super.getMiniAccumuloCluster().getZooKeepers());
+ ryaParams.setExporterUsername(ACCUMULO_USER);
+ ryaParams.setExporterPassword(ACCUMULO_PASSWORD);
+
+ final ObserverSpecification exportObserverConfig = new ObserverSpecification(QueryResultObserver.class.getName(), exportParams);
+ observers.add(exportObserverConfig);
+
+ // Add the observers to the Fluo Configuration.
+ super.getFluoConfiguration().addObservers(observers);
+ }
+
+ @Before
+ public void setupRya() throws Exception {
+ final MiniAccumuloCluster cluster = super.getMiniAccumuloCluster();
+ final String instanceName = cluster.getInstanceName();
+ final String zookeepers = cluster.getZooKeepers();
+
+ // Install the Rya instance to the mini accumulo cluster.
+ final RyaClient ryaClient = AccumuloRyaClientFactory.build(
+ new AccumuloConnectionDetails(
+ ACCUMULO_USER,
+ ACCUMULO_PASSWORD.toCharArray(),
+ instanceName,
+ zookeepers),
+ super.getAccumuloConnector());
+
+ ryaClient.getInstall().install(RYA_INSTANCE_NAME, InstallConfiguration.builder()
+ .setEnableTableHashPrefix(false)
+ .setEnableFreeTextIndex(false)
+ .setEnableEntityCentricIndex(false)
+ .setEnableGeoIndex(false)
+ .setEnableTemporalIndex(false)
+ .setEnablePcjIndex(true)
+ .setFluoPcjAppName( super.getFluoConfiguration().getApplicationName() )
+ .build());
+
+ // Connect to the Rya instance that was just installed.
+ final AccumuloRdfConfiguration conf = makeConfig(instanceName, zookeepers);
+ final Sail sail = RyaSailFactory.getInstance(conf);
+ ryaSailRepo = new RyaSailRepository(sail);
+ }
+
+ @After
+ public void teardownRya() throws Exception {
+ final MiniAccumuloCluster cluster = super.getMiniAccumuloCluster();
+ final String instanceName = cluster.getInstanceName();
+ final String zookeepers = cluster.getZooKeepers();
+
+ // Uninstall the instance of Rya.
+ final RyaClient ryaClient = AccumuloRyaClientFactory.build(
+ new AccumuloConnectionDetails(
+ ACCUMULO_USER,
+ ACCUMULO_PASSWORD.toCharArray(),
+ instanceName,
+ zookeepers),
+ super.getAccumuloConnector());
+
+ ryaClient.getUninstall().uninstall(RYA_INSTANCE_NAME);
+
+ // Shutdown the repo.
+ ryaSailRepo.shutDown();
+ }
+
+ /**
+ * @return A {@link RyaSailRepository} that is connected to the Rya instance that statements are loaded into.
+ */
+ protected RyaSailRepository getRyaSailRepository() throws Exception {
+ return ryaSailRepo;
+ }
+
+ protected AccumuloRdfConfiguration makeConfig(final String instanceName, final String zookeepers) {
+ final AccumuloRdfConfiguration conf = new AccumuloRdfConfiguration();
+ conf.setTablePrefix(RYA_INSTANCE_NAME);
+
+ // Accumulo connection information.
+ conf.setAccumuloUser(AccumuloExportITBase.ACCUMULO_USER);
+ conf.setAccumuloPassword(AccumuloExportITBase.ACCUMULO_PASSWORD);
+ conf.setAccumuloInstance(super.getAccumuloConnector().getInstance().getInstanceName());
+ conf.setAccumuloZookeepers(super.getAccumuloConnector().getInstance().getZooKeepers());
+ conf.setAuths("");
+
+ // PCJ configuration information.
+ conf.set(ConfigUtils.USE_PCJ, "true");
+ conf.set(ConfigUtils.USE_PCJ_UPDATER_INDEX, "true");
+ conf.set(ConfigUtils.FLUO_APP_NAME, super.getFluoConfiguration().getApplicationName());
+ conf.set(ConfigUtils.PCJ_STORAGE_TYPE,
+ PrecomputedJoinIndexerConfig.PrecomputedJoinStorageType.ACCUMULO.toString());
+ conf.set(ConfigUtils.PCJ_UPDATER_TYPE,
+ PrecomputedJoinIndexerConfig.PrecomputedJoinUpdaterType.FLUO.toString());
+
+ conf.setDisplayQueryPlan(true);
+
+ return conf;
+ }
+}
\ No newline at end of file