You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rya.apache.org by pu...@apache.org on 2016/05/13 13:13:12 UTC
[2/3] incubator-rya git commit: RYA-53 Added visibility support
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c53b5402/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/JoinResultUpdater.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/JoinResultUpdater.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/JoinResultUpdater.java
index bc558a5..700d0fb 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/JoinResultUpdater.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/JoinResultUpdater.java
@@ -37,6 +37,7 @@ import org.openrdf.query.Binding;
import org.openrdf.query.BindingSet;
import org.openrdf.query.impl.MapBindingSet;
+import com.google.common.base.Joiner;
import com.google.common.base.Optional;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
@@ -50,10 +51,11 @@ import io.fluo.api.iterator.ColumnIterator;
import io.fluo.api.iterator.RowIterator;
import io.fluo.api.types.Encoder;
import io.fluo.api.types.StringEncoder;
-import mvm.rya.indexing.external.tupleSet.BindingSetConverter;
+import mvm.rya.indexing.accumulo.VisibilityBindingSet;
import mvm.rya.indexing.external.tupleSet.BindingSetConverter.BindingSetConversionException;
import mvm.rya.indexing.external.tupleSet.BindingSetStringConverter;
import mvm.rya.indexing.external.tupleSet.PcjTables.VariableOrder;
+import mvm.rya.indexing.external.tupleSet.VisibilityBindingSetStringConverter;
/**
* Updates the results of a Join node when one of its children has added a
@@ -62,7 +64,8 @@ import mvm.rya.indexing.external.tupleSet.PcjTables.VariableOrder;
@ParametersAreNonnullByDefault
public class JoinResultUpdater {
- private static final BindingSetConverter<String> converter = new BindingSetStringConverter();
+ private static final BindingSetStringConverter idConverter = new BindingSetStringConverter();
+ private static final VisibilityBindingSetStringConverter valueConverter = new VisibilityBindingSetStringConverter();
private final FluoQueryMetadataDAO queryDao = new FluoQueryMetadataDAO();
private final Encoder encoder = new StringEncoder();
@@ -80,7 +83,7 @@ public class JoinResultUpdater {
public void updateJoinResults(
final TransactionBase tx,
final String childId,
- final BindingSet childBindingSet,
+ final VisibilityBindingSet childBindingSet,
final JoinMetadata joinMetadata) throws BindingSetConversionException {
checkNotNull(tx);
checkNotNull(childId);
@@ -113,10 +116,10 @@ public class JoinResultUpdater {
}
// Iterates over the sibling node's BindingSets that join with the new binding set.
- FluoTableIterator siblingBindingSets = makeSiblingScanIterator(childId, childBindingSet, siblingId, tx);
+ final FluoTableIterator siblingBindingSets = makeSiblingScanIterator(childId, childBindingSet, siblingId, tx);
// Iterates over the resulting BindingSets from the join.
- final Iterator<BindingSet> newJoinResults;
+ final Iterator<VisibilityBindingSet> newJoinResults;
if(emittingSide == Side.LEFT) {
newJoinResults = joinAlgorithm.newLeftResult(childBindingSet, siblingBindingSets);
} else {
@@ -124,14 +127,15 @@ public class JoinResultUpdater {
}
// Insert the new join binding sets to the Fluo table.
- VariableOrder joinVarOrder = joinMetadata.getVariableOrder();
+ final VariableOrder joinVarOrder = joinMetadata.getVariableOrder();
while(newJoinResults.hasNext()) {
- BindingSet newJoinResult = newJoinResults.next();
- String joinBindingSetString = converter.convert(newJoinResult, joinVarOrder);
+ final BindingSet newJoinResult = newJoinResults.next();
+ final String joinBindingSetStringId = idConverter.convert(newJoinResult, joinVarOrder);
+ final String joinBindingSetStringValue = valueConverter.convert(newJoinResult, joinVarOrder);
- final Bytes row = encoder.encode(joinMetadata.getNodeId() + NODEID_BS_DELIM + joinBindingSetString);
+ final Bytes row = encoder.encode(joinMetadata.getNodeId() + NODEID_BS_DELIM + joinBindingSetStringId);
final Column col = FluoQueryColumns.JOIN_BINDING_SET;
- final Bytes value = encoder.encode(joinBindingSetString);
+ final Bytes value = encoder.encode(joinBindingSetStringValue);
tx.set(row, col, value);
}
}
@@ -143,14 +147,14 @@ public class JoinResultUpdater {
LEFT, RIGHT;
}
- private FluoTableIterator makeSiblingScanIterator(String childId, BindingSet childBindingSet, String siblingId, TransactionBase tx) throws BindingSetConversionException {
+ private FluoTableIterator makeSiblingScanIterator(final String childId, final BindingSet childBindingSet, final String siblingId, final TransactionBase tx) throws BindingSetConversionException {
// Get the common variable orders. These are used to build the prefix.
final VariableOrder childVarOrder = getVarOrder(tx, childId);
final VariableOrder siblingVarOrder = getVarOrder(tx, siblingId);
- List<String> commonVars = getCommonVars(childVarOrder, siblingVarOrder);
+ final List<String> commonVars = getCommonVars(childVarOrder, siblingVarOrder);
// Get the Binding strings
- final String childBindingSetString = converter.convert(childBindingSet, childVarOrder);
+ final String childBindingSetString = valueConverter.convert(childBindingSet, childVarOrder);
final String[] childBindingStrings = FluoStringConverter.toBindingStrings(childBindingSetString);
// Create the prefix that will be used to scan for binding sets of the sibling node.
@@ -285,20 +289,20 @@ public class JoinResultUpdater {
public static interface IterativeJoin {
/**
- * Invoked when a new {@link BindingSet} is emitted from the left child
+ * Invoked when a new {@link VisibilityBindingSet} is emitted from the left child
* node of the join. The Fluo table is scanned for results on the right
* side that will be joined with the new result.
*
- * @param newLeftResult - A new BindingSet that has been emitted from
+ * @param newLeftResult - A new VisibilityBindingSet that has been emitted from
* the left child node.
* @param rightResults - The right child node's binding sets that will
* be joined with the new left result. (not null)
* @return The new BindingSet results for the join.
*/
- public Iterator<BindingSet> newLeftResult(BindingSet newLeftResult, Iterator<BindingSet> rightResults);
+ public Iterator<VisibilityBindingSet> newLeftResult(VisibilityBindingSet newLeftResult, Iterator<VisibilityBindingSet> rightResults);
/**
- * Invoked when a new {@link BindingSet} is emitted from the right child
+ * Invoked when a new {@link VisibilityBindingSet} is emitted from the right child
* node of the join. The Fluo table is scanned for results on the left
* side that will be joined with the new result.
*
@@ -308,7 +312,7 @@ public class JoinResultUpdater {
* the right child node.
* @return The new BindingSet results for the join.
*/
- public Iterator<BindingSet> newRightResult(Iterator<BindingSet> leftResults, BindingSet newRightResult);
+ public Iterator<VisibilityBindingSet> newRightResult(Iterator<VisibilityBindingSet> leftResults, VisibilityBindingSet newRightResult);
}
/**
@@ -321,7 +325,7 @@ public class JoinResultUpdater {
*/
public static final class NaturalJoin implements IterativeJoin {
@Override
- public Iterator<BindingSet> newLeftResult(BindingSet newLeftResult, Iterator<BindingSet> rightResults) {
+ public Iterator<VisibilityBindingSet> newLeftResult(final VisibilityBindingSet newLeftResult, final Iterator<VisibilityBindingSet> rightResults) {
checkNotNull(newLeftResult);
checkNotNull(rightResults);
@@ -330,7 +334,7 @@ public class JoinResultUpdater {
}
@Override
- public Iterator<BindingSet> newRightResult(Iterator<BindingSet> leftResults, BindingSet newRightResult) {
+ public Iterator<VisibilityBindingSet> newRightResult(final Iterator<VisibilityBindingSet> leftResults, final VisibilityBindingSet newRightResult) {
checkNotNull(leftResults);
checkNotNull(newRightResult);
@@ -349,14 +353,14 @@ public class JoinResultUpdater {
*/
public static final class LeftOuterJoin implements IterativeJoin {
@Override
- public Iterator<BindingSet> newLeftResult(BindingSet newLeftResult, Iterator<BindingSet> rightResults) {
+ public Iterator<VisibilityBindingSet> newLeftResult(final VisibilityBindingSet newLeftResult, final Iterator<VisibilityBindingSet> rightResults) {
checkNotNull(newLeftResult);
checkNotNull(rightResults);
// If the required portion does not join with any optional portions,
// then emit a BindingSet that matches the new left result.
if(!rightResults.hasNext()) {
- return Lists.<BindingSet>newArrayList(newLeftResult).iterator();
+ return Lists.<VisibilityBindingSet>newArrayList(newLeftResult).iterator();
}
// Otherwise, return an iterator that holds the new required result
@@ -365,7 +369,7 @@ public class JoinResultUpdater {
}
@Override
- public Iterator<BindingSet> newRightResult(final Iterator<BindingSet> leftResults, final BindingSet newRightResult) {
+ public Iterator<VisibilityBindingSet> newRightResult(final Iterator<VisibilityBindingSet> leftResults, final VisibilityBindingSet newRightResult) {
checkNotNull(leftResults);
checkNotNull(newRightResult);
@@ -382,10 +386,10 @@ public class JoinResultUpdater {
* This is done lazily so that you don't have to load all of the BindingSets
* into memory at once.
*/
- private static final class LazyJoiningIterator implements Iterator<BindingSet> {
+ private static final class LazyJoiningIterator implements Iterator<VisibilityBindingSet> {
- private final BindingSet newResult;
- private final Iterator<BindingSet> joinedResults;
+ private final VisibilityBindingSet newResult;
+ private final Iterator<VisibilityBindingSet> joinedResults;
/**
* Constructs an instance of {@link LazyJoiningIterator}.
@@ -393,9 +397,9 @@ public class JoinResultUpdater {
* @param newResult - A binding set that will be joined with some other binding sets. (not null)
* @param joinResults - The binding sets that will be joined with {@code newResult}. (not null)
*/
- public LazyJoiningIterator(BindingSet newResult, Iterator<BindingSet> joinResults) {
+ public LazyJoiningIterator(final VisibilityBindingSet newResult, final Iterator<VisibilityBindingSet> joinResults) {
this.newResult = checkNotNull(newResult);
- this.joinedResults = checkNotNull(joinResults);
+ joinedResults = checkNotNull(joinResults);
}
@Override
@@ -404,18 +408,28 @@ public class JoinResultUpdater {
}
@Override
- public BindingSet next() {
+ public VisibilityBindingSet next() {
final MapBindingSet bs = new MapBindingSet();
- for(Binding binding : newResult) {
+ for(final Binding binding : newResult) {
bs.addBinding(binding);
}
- for(Binding binding : joinedResults.next()) {
+ final VisibilityBindingSet joinResult = joinedResults.next();
+ for(final Binding binding : joinResult) {
bs.addBinding(binding);
}
- return bs;
+ String visibility = "";
+ final Joiner join = Joiner.on(")&(");
+ final String leftVisi = newResult.getVisibility();
+ final String rightVisi = joinResult.getVisibility();
+ if(leftVisi.isEmpty() || rightVisi.isEmpty()) {
+ visibility = (leftVisi + rightVisi).trim();
+ } else {
+ visibility = "(" + join.join(leftVisi, rightVisi) + ")";
+ }
+ return new VisibilityBindingSet(bs, visibility);
}
@Override
@@ -428,7 +442,7 @@ public class JoinResultUpdater {
* Iterates over rows that have a Binding Set column and returns the unmarshalled
* {@link BindingSet}s.
*/
- private static final class FluoTableIterator implements Iterator<BindingSet> {
+ private static final class FluoTableIterator implements Iterator<VisibilityBindingSet> {
private static final Set<Column> BINDING_SET_COLUMNS = Sets.newHashSet(
FluoQueryColumns.STATEMENT_PATTERN_BINDING_SET,
@@ -445,7 +459,7 @@ public class JoinResultUpdater {
* @param varOrder - The Variable Order of binding sets that will be
* read from the Fluo Table. (not null)
*/
- public FluoTableIterator(RowIterator rows, VariableOrder varOrder) {
+ public FluoTableIterator(final RowIterator rows, final VariableOrder varOrder) {
this.rows = checkNotNull(rows);
this.varOrder = checkNotNull(varOrder);
}
@@ -456,7 +470,7 @@ public class JoinResultUpdater {
}
@Override
- public BindingSet next() {
+ public VisibilityBindingSet next() {
final ColumnIterator columns = rows.next().getValue();
while(columns.hasNext()) {
@@ -464,11 +478,7 @@ public class JoinResultUpdater {
final Entry<Column, Bytes> entry = columns.next();
if(BINDING_SET_COLUMNS.contains(entry.getKey())) {
final String bindingSetString = entry.getValue().toString();
- try {
- return converter.convert(bindingSetString, varOrder);
- } catch (BindingSetConversionException e) {
- throw new RuntimeException("Could not convert one of the stored BindingSets from a String: " + bindingSetString, e);
- }
+ return (VisibilityBindingSet) valueConverter.convert(bindingSetString, varOrder);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c53b5402/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/QueryResultUpdater.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/QueryResultUpdater.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/QueryResultUpdater.java
index f3ff089..8e0a6fe 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/QueryResultUpdater.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/QueryResultUpdater.java
@@ -26,7 +26,6 @@ import javax.annotation.ParametersAreNonnullByDefault;
import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
import org.apache.rya.indexing.pcj.fluo.app.query.QueryMetadata;
import org.openrdf.query.Binding;
-import org.openrdf.query.BindingSet;
import org.openrdf.query.impl.MapBindingSet;
import io.fluo.api.client.TransactionBase;
@@ -34,8 +33,10 @@ import io.fluo.api.data.Bytes;
import io.fluo.api.data.Column;
import io.fluo.api.types.Encoder;
import io.fluo.api.types.StringEncoder;
+import mvm.rya.indexing.accumulo.VisibilityBindingSet;
import mvm.rya.indexing.external.tupleSet.BindingSetStringConverter;
import mvm.rya.indexing.external.tupleSet.PcjTables.VariableOrder;
+import mvm.rya.indexing.external.tupleSet.VisibilityBindingSetStringConverter;
/**
* Updates the results of a Query node when one of its children has added a
@@ -43,10 +44,10 @@ import mvm.rya.indexing.external.tupleSet.PcjTables.VariableOrder;
*/
@ParametersAreNonnullByDefault
public class QueryResultUpdater {
-
private final Encoder encoder = new StringEncoder();
private final BindingSetStringConverter converter = new BindingSetStringConverter();
+ private final VisibilityBindingSetStringConverter valueConverter = new VisibilityBindingSetStringConverter();
/**
* Updates the results of a Query node when one of its children has added a
@@ -58,7 +59,7 @@ public class QueryResultUpdater {
*/
public void updateQueryResults(
final TransactionBase tx,
- final BindingSet childBindingSet,
+ final VisibilityBindingSet childBindingSet,
final QueryMetadata queryMetadata) {
checkNotNull(tx);
checkNotNull(childBindingSet);
@@ -75,11 +76,12 @@ public class QueryResultUpdater {
}
}
final String queryBindingSetString = converter.convert(queryBindingSet, queryVarOrder);
+ final String queryBindingSetValueString = valueConverter.convert(new VisibilityBindingSet(queryBindingSet, childBindingSet.getVisibility()), queryVarOrder);
// Commit it to the Fluo table for the SPARQL query. This isn't guaranteed to be a new entry.
final Bytes row = encoder.encode(queryMetadata.getNodeId() + NODEID_BS_DELIM + queryBindingSetString);
final Column col = FluoQueryColumns.QUERY_BINDING_SET;
- final Bytes value = encoder.encode(queryBindingSetString);
+ final Bytes value = encoder.encode(queryBindingSetValueString);
tx.set(row, col, value);
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c53b5402/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalResultExporter.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalResultExporter.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalResultExporter.java
index fbbae33..c2c031c 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalResultExporter.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalResultExporter.java
@@ -20,9 +20,8 @@ package org.apache.rya.indexing.pcj.fluo.app.export;
import javax.annotation.ParametersAreNonnullByDefault;
-import org.openrdf.query.BindingSet;
-
import io.fluo.api.types.TypedTransactionBase;
+import mvm.rya.indexing.accumulo.VisibilityBindingSet;
/**
* Exports a single Binding Set that is a new result for a SPARQL query to some
@@ -40,7 +39,7 @@ public interface IncrementalResultExporter {
* Fluo application. (not null)
* @throws ResultExportException The result could not be exported.
*/
- public void export(TypedTransactionBase tx, String queryId, BindingSet result) throws ResultExportException;
+ public void export(TypedTransactionBase tx, String queryId, VisibilityBindingSet result) throws ResultExportException;
/**
* A result could not be exported.
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c53b5402/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaResultExporter.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaResultExporter.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaResultExporter.java
index 4d51798..5c8c719 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaResultExporter.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaResultExporter.java
@@ -25,10 +25,10 @@ import java.util.Collections;
import org.apache.accumulo.core.client.Connector;
import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporter;
import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
-import org.openrdf.query.BindingSet;
import io.fluo.api.data.Bytes;
import io.fluo.api.types.TypedTransactionBase;
+import mvm.rya.indexing.accumulo.VisibilityBindingSet;
import mvm.rya.indexing.external.tupleSet.PcjTables;
import mvm.rya.indexing.external.tupleSet.PcjTables.PcjException;
@@ -55,7 +55,7 @@ public class RyaResultExporter implements IncrementalResultExporter {
public void export(
final TypedTransactionBase fluoTx,
final String queryId,
- final BindingSet result) throws ResultExportException {
+ final VisibilityBindingSet result) throws ResultExportException {
checkNotNull(fluoTx);
checkNotNull(queryId);
checkNotNull(result);
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c53b5402/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/BindingSetUpdater.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/BindingSetUpdater.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/BindingSetUpdater.java
index aa944e4..9bd0148 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/BindingSetUpdater.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/BindingSetUpdater.java
@@ -31,13 +31,15 @@ import org.apache.rya.indexing.pcj.fluo.app.query.FilterMetadata;
import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO;
import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata;
import org.apache.rya.indexing.pcj.fluo.app.query.QueryMetadata;
-import org.openrdf.query.BindingSet;
import io.fluo.api.client.TransactionBase;
import io.fluo.api.data.Bytes;
import io.fluo.api.data.Column;
+import io.fluo.api.types.Encoder;
+import io.fluo.api.types.StringEncoder;
import io.fluo.api.types.TypedObserver;
import io.fluo.api.types.TypedTransactionBase;
+import mvm.rya.indexing.accumulo.VisibilityBindingSet;
import mvm.rya.indexing.external.tupleSet.BindingSetConverter.BindingSetConversionException;
/**
@@ -48,6 +50,7 @@ import mvm.rya.indexing.external.tupleSet.BindingSetConverter.BindingSetConversi
@ParametersAreNonnullByDefault
public abstract class BindingSetUpdater extends TypedObserver {
+ private final Encoder encoder = new StringEncoder();
// DAO
private final FluoQueryMetadataDAO queryDao = new FluoQueryMetadataDAO();
@@ -74,9 +77,10 @@ public abstract class BindingSetUpdater extends TypedObserver {
checkNotNull(row);
checkNotNull(col);
- final Observation observation = parseObservation( tx, BindingSetRow.make(row) );
+ final String bindingSetString = encoder.decodeString(tx.get(row, col));
+ final Observation observation = parseObservation( tx, new BindingSetRow(BindingSetRow.make(row).getNodeId(), bindingSetString) );
final String observedNodeId = observation.getObservedNodeId();
- final BindingSet observedBindingSet = observation.getObservedBindingSet();
+ final VisibilityBindingSet observedBindingSet = observation.getObservedBindingSet();
final String parentNodeId = observation.getParentId();
// Figure out which node needs to handle the new metadata.
@@ -100,7 +104,7 @@ public abstract class BindingSetUpdater extends TypedObserver {
final JoinMetadata parentJoin = queryDao.readJoinMetadata(tx, parentNodeId);
try {
joinUpdater.updateJoinResults(tx, observedNodeId, observedBindingSet, parentJoin);
- } catch (BindingSetConversionException e) {
+ } catch (final BindingSetConversionException e) {
throw new RuntimeException("Could not process a Join node.", e);
}
break;
@@ -117,7 +121,7 @@ public abstract class BindingSetUpdater extends TypedObserver {
public static final class Observation {
private final String observedNodeId;
- private final BindingSet observedBindingSet;
+ private final VisibilityBindingSet observedBindingSet;
private final String parentNodeId;
/**
@@ -129,7 +133,7 @@ public abstract class BindingSetUpdater extends TypedObserver {
*/
public Observation(
final String observedNodeId,
- final BindingSet observedBindingSet,
+ final VisibilityBindingSet observedBindingSet,
final String parentNodeId) {
this.observedNodeId = checkNotNull(observedNodeId);
this.observedBindingSet = checkNotNull(observedBindingSet);
@@ -146,7 +150,7 @@ public abstract class BindingSetUpdater extends TypedObserver {
/**
* @return A Binding Set that was just emitted.
*/
- public BindingSet getObservedBindingSet() {
+ public VisibilityBindingSet getObservedBindingSet() {
return observedBindingSet;
}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c53b5402/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/FilterObserver.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/FilterObserver.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/FilterObserver.java
index 2accde3..fb15934 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/FilterObserver.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/FilterObserver.java
@@ -27,8 +27,9 @@ import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO;
import org.openrdf.query.BindingSet;
import io.fluo.api.client.TransactionBase;
-import mvm.rya.indexing.external.tupleSet.BindingSetStringConverter;
+import mvm.rya.indexing.accumulo.VisibilityBindingSet;
import mvm.rya.indexing.external.tupleSet.PcjTables.VariableOrder;
+import mvm.rya.indexing.external.tupleSet.VisibilityBindingSetStringConverter;
/**
* Notified when the results of a Filter have been updated to include a new
@@ -37,7 +38,7 @@ import mvm.rya.indexing.external.tupleSet.PcjTables.VariableOrder;
*/
public class FilterObserver extends BindingSetUpdater {
- private final BindingSetStringConverter converter = new BindingSetStringConverter();
+ private final VisibilityBindingSetStringConverter converter = new VisibilityBindingSetStringConverter();
private final FluoQueryMetadataDAO queryDao = new FluoQueryMetadataDAO();
@@ -57,7 +58,7 @@ public class FilterObserver extends BindingSetUpdater {
// Read the Binding Set that was just emmitted by the Filter.
final VariableOrder filterVarOrder = filterMetadata.getVariableOrder();
- final BindingSet filterBindingSet = converter.convert(parsedRow.getBindingSetString(), filterVarOrder);
+ final VisibilityBindingSet filterBindingSet = (VisibilityBindingSet) converter.convert(parsedRow.getBindingSetString(), filterVarOrder);
// Figure out which node needs to handle the new metadata.
final String parentNodeId = filterMetadata.getParentNodeId();
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c53b5402/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/JoinObserver.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/JoinObserver.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/JoinObserver.java
index 43b0a4e..a8cd0df 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/JoinObserver.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/JoinObserver.java
@@ -27,8 +27,9 @@ import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata;
import org.openrdf.query.BindingSet;
import io.fluo.api.client.TransactionBase;
-import mvm.rya.indexing.external.tupleSet.BindingSetStringConverter;
+import mvm.rya.indexing.accumulo.VisibilityBindingSet;
import mvm.rya.indexing.external.tupleSet.PcjTables.VariableOrder;
+import mvm.rya.indexing.external.tupleSet.VisibilityBindingSetStringConverter;
/**
* Notified when the results of a Join have been updated to include a new
@@ -37,7 +38,7 @@ import mvm.rya.indexing.external.tupleSet.PcjTables.VariableOrder;
*/
public class JoinObserver extends BindingSetUpdater {
- private final BindingSetStringConverter converter = new BindingSetStringConverter();
+ private final VisibilityBindingSetStringConverter converter = new VisibilityBindingSetStringConverter();
private final FluoQueryMetadataDAO queryDao = new FluoQueryMetadataDAO();
@@ -56,7 +57,7 @@ public class JoinObserver extends BindingSetUpdater {
// Read the Binding Set that was just emmitted by the Join.
final VariableOrder joinVarOrder = joinMetadata.getVariableOrder();
- final BindingSet joinBindingSet = converter.convert(parsedRow.getBindingSetString(), joinVarOrder);
+ final VisibilityBindingSet joinBindingSet = (VisibilityBindingSet) converter.convert(parsedRow.getBindingSetString(), joinVarOrder);
// Figure out which node needs to handle the new metadata.
final String parentNodeId = joinMetadata.getParentNodeId();
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c53b5402/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java
index 7c1a588..fe4dc56 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java
@@ -29,17 +29,19 @@ import org.apache.rya.indexing.pcj.fluo.app.export.rya.RyaResultExporterFactory;
import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO;
import org.apache.rya.indexing.pcj.fluo.app.query.QueryMetadata;
-import org.openrdf.query.BindingSet;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableSet;
import io.fluo.api.data.Bytes;
import io.fluo.api.data.Column;
+import io.fluo.api.types.Encoder;
+import io.fluo.api.types.StringEncoder;
import io.fluo.api.types.TypedObserver;
import io.fluo.api.types.TypedTransactionBase;
-import mvm.rya.indexing.external.tupleSet.BindingSetStringConverter;
+import mvm.rya.indexing.accumulo.VisibilityBindingSet;
import mvm.rya.indexing.external.tupleSet.PcjTables.VariableOrder;
+import mvm.rya.indexing.external.tupleSet.VisibilityBindingSetStringConverter;
/**
* Performs incremental result exporting to the configured destinations.
@@ -47,9 +49,9 @@ import mvm.rya.indexing.external.tupleSet.PcjTables.VariableOrder;
public class QueryResultObserver extends TypedObserver {
private static final Logger log = Logger.getLogger(QueryResultObserver.class);
- private final FluoQueryMetadataDAO queryDao = new FluoQueryMetadataDAO();
-
- private final BindingSetStringConverter converter = new BindingSetStringConverter();
+ private static final FluoQueryMetadataDAO QUERY_DAO = new FluoQueryMetadataDAO();
+ private static final Encoder ENCODER = new StringEncoder();
+ private static final VisibilityBindingSetStringConverter CONVERTER = new VisibilityBindingSetStringConverter();
/**
* Builders for each type of result exporter we support.
@@ -93,16 +95,16 @@ public class QueryResultObserver extends TypedObserver {
@Override
public void process(final TypedTransactionBase tx, final Bytes row, final Column col) {
// Read the SPARQL query and it Binding Set from the row id.
- final String[] queryAndBindingSet = row.toString().split(NODEID_BS_DELIM);
+ final String[] queryAndBindingSet = ENCODER.decodeString(row).split(NODEID_BS_DELIM);
final String queryId = queryAndBindingSet[0];
- final String bindingSetString = queryAndBindingSet[1];
+ final String bindingSetString = ENCODER.decodeString(tx.get(row, col));
// Fetch the query's Variable Order from the Fluo table.
- final QueryMetadata queryMetadata = queryDao.readQueryMetadata(tx, queryId);
+ final QueryMetadata queryMetadata = QUERY_DAO.readQueryMetadata(tx, queryId);
final VariableOrder varOrder = queryMetadata.getVariableOrder();
// Export the result using each of the provided exporters.
- BindingSet result = converter.convert(bindingSetString, varOrder);
+ final VisibilityBindingSet result = (VisibilityBindingSet) CONVERTER.convert(bindingSetString, varOrder);
for(final IncrementalResultExporter exporter : exporters) {
try {
exporter.export(tx, queryId, result);
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c53b5402/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/StatementPatternObserver.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/StatementPatternObserver.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/StatementPatternObserver.java
index ddba9a2..7b1e510 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/StatementPatternObserver.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/StatementPatternObserver.java
@@ -27,8 +27,9 @@ import org.apache.rya.indexing.pcj.fluo.app.query.StatementPatternMetadata;
import org.openrdf.query.BindingSet;
import io.fluo.api.client.TransactionBase;
-import mvm.rya.indexing.external.tupleSet.BindingSetStringConverter;
+import mvm.rya.indexing.accumulo.VisibilityBindingSet;
import mvm.rya.indexing.external.tupleSet.PcjTables.VariableOrder;
+import mvm.rya.indexing.external.tupleSet.VisibilityBindingSetStringConverter;
/**
* Notified when the results of a Statement Pattern have been updated to include
@@ -37,7 +38,7 @@ import mvm.rya.indexing.external.tupleSet.PcjTables.VariableOrder;
*/
public class StatementPatternObserver extends BindingSetUpdater {
- private final BindingSetStringConverter converter = new BindingSetStringConverter();
+ private static final VisibilityBindingSetStringConverter CONVERTER = new VisibilityBindingSetStringConverter();
// DAO
private final FluoQueryMetadataDAO queryDao = new FluoQueryMetadataDAO();
@@ -54,10 +55,11 @@ public class StatementPatternObserver extends BindingSetUpdater {
// Read the Statement Pattern metadata.
final String spNodeId = parsedRow.getNodeId();
final StatementPatternMetadata spMetadata = queryDao.readStatementPatternMetadata(tx, spNodeId);
+ final String bindingSetValue = parsedRow.getBindingSetString();
// Read the Binding Set that was just emmitted by the Statement Pattern.
final VariableOrder spVarOrder = spMetadata.getVariableOrder();
- final BindingSet spBindingSet = converter.convert(parsedRow.getBindingSetString(), spVarOrder);
+ final VisibilityBindingSet spBindingSet = (VisibilityBindingSet) CONVERTER.convert(bindingSetValue, spVarOrder);
// Figure out which node needs to handle the new metadata.
final String parentNodeId = spMetadata.getParentNodeId();
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c53b5402/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/TripleObserver.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/TripleObserver.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/TripleObserver.java
index d43ffc9..496c0ed 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/TripleObserver.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/TripleObserver.java
@@ -40,8 +40,13 @@ import io.fluo.api.data.Column;
import io.fluo.api.data.Span;
import io.fluo.api.iterator.ColumnIterator;
import io.fluo.api.iterator.RowIterator;
+import io.fluo.api.types.Encoder;
+import io.fluo.api.types.StringEncoder;
import io.fluo.api.types.TypedObserver;
import io.fluo.api.types.TypedTransactionBase;
+import mvm.rya.indexing.accumulo.VisibilityBindingSet;
+import mvm.rya.indexing.external.tupleSet.PcjTables.VariableOrder;
+import mvm.rya.indexing.external.tupleSet.VisibilityBindingSetStringConverter;
/**
* An observer that matches new Triples to the Statement Patterns that are part
@@ -50,7 +55,9 @@ import io.fluo.api.types.TypedTransactionBase;
*/
public class TripleObserver extends TypedObserver {
- private final FluoQueryMetadataDAO queryDao = new FluoQueryMetadataDAO();
+ private static final Encoder ENCODER = new StringEncoder();
+ private static final FluoQueryMetadataDAO QUERY_DAO = new FluoQueryMetadataDAO();
+ private static final VisibilityBindingSetStringConverter CONVERTER = new VisibilityBindingSetStringConverter();
public TripleObserver() {
super(new StringTypeLayer());
@@ -65,6 +72,11 @@ public class TripleObserver extends TypedObserver {
public void process(final TypedTransactionBase tx, final Bytes row, final Column column) {
//get string representation of triple
final String triple = IncUpdateDAO.getTripleString(row);
+ final Bytes visiBytes = tx.get(row, FluoQueryColumns.TRIPLES);
+ String visibility = "";
+ if(visiBytes != null) {
+ visibility = ENCODER.decodeString(visiBytes);
+ }
//get variable metadata for all SP in table
final ScannerConfiguration sc1 = new ScannerConfiguration();
@@ -75,19 +87,25 @@ public class TripleObserver extends TypedObserver {
final RowIterator ri = tx.get(sc1);
while(ri.hasNext()) {
-
final Entry<Bytes, ColumnIterator> next = ri.next();
final ColumnIterator ci = next.getValue();
final String spID = next.getKey().toString();
- final StatementPatternMetadata spMetadata = queryDao.readStatementPatternMetadata(tx, spID);
+ final StatementPatternMetadata spMetadata = QUERY_DAO.readStatementPatternMetadata(tx, spID);
final String pattern = spMetadata.getStatementPattern();
while(ci.hasNext()) {
final String varOrders = ci.next().getValue().toString();
- final String bindingSet = getBindingSet(triple, pattern, varOrders);
- if(bindingSet.length() != 0) {
- tx.mutate().row(spID + NODEID_BS_DELIM + bindingSet).col(FluoQueryColumns.STATEMENT_PATTERN_BINDING_SET).set(bindingSet);
+ final VariableOrder varOrder = new VariableOrder(varOrders);
+ final String bindingSetString = getBindingSet(triple, pattern, varOrders);
+
+ //Statement matches to a binding set
+ if(bindingSetString.length() != 0) {
+ final VisibilityBindingSet bindingSet = new VisibilityBindingSet(
+ CONVERTER.convert(bindingSetString, varOrder),
+ visibility);
+ final String valueString = CONVERTER.convert(bindingSet, varOrder);
+ tx.mutate().row(spID + NODEID_BS_DELIM + bindingSetString).col(FluoQueryColumns.STATEMENT_PATTERN_BINDING_SET).set(valueString);
}
}
}
@@ -96,8 +114,16 @@ public class TripleObserver extends TypedObserver {
tx.delete(row, column);
}
- //determines whether triple matches SPID conditions and generates bindingset
- //whose order is determined by varOrder
+ /**
+ * Determines whether triple matches Statement Pattern ID conditions if
+ * so, generates a string representation of a BindingSet whose order
+ * is determined by varOrder.
+ * @param triple - The triple to consider.
+ * @param spID - The statement pattern ID
+ * @param varOrder - The variable order
+ * @return The string representation of the BindingSet or an empty string,
+ * signifying the triple did not match the statement pattern ID.
+ */
private static String getBindingSet(final String triple, final String spID, final String varOrder) {
final String[] spIdArray = spID.split(DELIM);
final String[] tripleArray = triple.split(DELIM);
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c53b5402/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryColumns.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryColumns.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryColumns.java
index fa25456..be24ac9 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryColumns.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryColumns.java
@@ -88,7 +88,7 @@ public class FluoQueryColumns {
* <p>
* <table border="1" style="width:100%">
* <tr> <th>Fluo Row</td> <th>Fluo Column</td> <th>Fluo Value</td> </tr>
- * <tr> <td>Core Rya SPO formatted triple</td> <td>triples:SPO</td> <td>empty</td> </tr>
+ * <tr> <td>Core Rya SPO formatted triple</td> <td>triples:SPO</td> <td>visibility</td> </tr>
* </table>
*/
public static final Column TRIPLES = new Column("triples", "SPO");
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c53b5402/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java
index 265ca0f..83985da 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java
@@ -205,8 +205,8 @@ public class FluoQueryMetadataDAO {
final String varOrderString = encoder.decodeString( values.get(FluoQueryColumns.JOIN_VARIABLE_ORDER));
final VariableOrder varOrder = new VariableOrder(varOrderString);
- String joinTypeString = encoder.decodeString( values.get(FluoQueryColumns.JOIN_TYPE) );
- JoinType joinType = JoinType.valueOf(joinTypeString);
+ final String joinTypeString = encoder.decodeString( values.get(FluoQueryColumns.JOIN_TYPE) );
+ final JoinType joinType = JoinType.valueOf(joinTypeString);
final String parentNodeId = encoder.decodeString( values.get(FluoQueryColumns.JOIN_PARENT_NODE_ID) );
final String leftChildNodeId = encoder.decodeString( values.get(FluoQueryColumns.JOIN_LEFT_CHILD_NODE_ID) );
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c53b5402/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/LeftOuterJoinTest.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/LeftOuterJoinTest.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/LeftOuterJoinTest.java
index 025c3e7..08d5cef 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/LeftOuterJoinTest.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/LeftOuterJoinTest.java
@@ -37,6 +37,8 @@ import org.openrdf.query.impl.MapBindingSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
+import mvm.rya.indexing.accumulo.VisibilityBindingSet;
+
/**
* Tests the methods of {@link LeftOuterJoin}.
*/
@@ -46,128 +48,134 @@ public class LeftOuterJoinTest {
@Test
public void newLeftResult_noRightMatches() {
- IterativeJoin leftOuterJoin = new LeftOuterJoin();
+ final IterativeJoin leftOuterJoin = new LeftOuterJoin();
// There is a new left result.
- MapBindingSet newLeftResult = new MapBindingSet();
- newLeftResult.addBinding("name", vf.createLiteral("Bob"));
+ final MapBindingSet mapLeftResult = new MapBindingSet();
+ mapLeftResult.addBinding("name", vf.createLiteral("Bob"));
+ final VisibilityBindingSet newLeftResult = new VisibilityBindingSet(mapLeftResult);
// There are no right results that join with the left result.
- Iterator<BindingSet> rightResults= new ArrayList<BindingSet>().iterator();
+ final Iterator<VisibilityBindingSet> rightResults= new ArrayList<VisibilityBindingSet>().iterator();
// Therefore, the left result is a new join result.
- Iterator<BindingSet> newJoinResultsIt = leftOuterJoin.newLeftResult(newLeftResult, rightResults);
+ final Iterator<VisibilityBindingSet> newJoinResultsIt = leftOuterJoin.newLeftResult(newLeftResult, rightResults);
- Set<BindingSet> newJoinResults = new HashSet<>();
+ final Set<BindingSet> newJoinResults = new HashSet<>();
while(newJoinResultsIt.hasNext()) {
newJoinResults.add( newJoinResultsIt.next() );
}
- Set<BindingSet> expected = Sets.<BindingSet>newHashSet( newLeftResult );
+ final Set<BindingSet> expected = Sets.<BindingSet>newHashSet( newLeftResult );
assertEquals(expected, newJoinResults);
}
@Test
public void newLeftResult_joinsWithRightResults() {
- IterativeJoin leftOuterJoin = new LeftOuterJoin();
+ final IterativeJoin leftOuterJoin = new LeftOuterJoin();
// There is a new left result.
- MapBindingSet newLeftResult = new MapBindingSet();
- newLeftResult.addBinding("name", vf.createLiteral("Bob"));
- newLeftResult.addBinding("height", vf.createLiteral("5'9\""));
+ final MapBindingSet mapLeftResult = new MapBindingSet();
+ mapLeftResult.addBinding("name", vf.createLiteral("Bob"));
+ mapLeftResult.addBinding("height", vf.createLiteral("5'9\""));
+ final VisibilityBindingSet newLeftResult = new VisibilityBindingSet(mapLeftResult);
// There are a few right results that join with the left result.
- MapBindingSet nameAge = new MapBindingSet();
+ final MapBindingSet nameAge = new MapBindingSet();
nameAge.addBinding("name", vf.createLiteral("Bob"));
nameAge.addBinding("age", vf.createLiteral(56));
+ final VisibilityBindingSet visiAge = new VisibilityBindingSet(nameAge);
- MapBindingSet nameHair = new MapBindingSet();
+ final MapBindingSet nameHair = new MapBindingSet();
nameHair.addBinding("name", vf.createLiteral("Bob"));
nameHair.addBinding("hairColor", vf.createLiteral("Brown"));
+ final VisibilityBindingSet visiHair = new VisibilityBindingSet(nameHair);
- Iterator<BindingSet> rightResults = Lists.<BindingSet>newArrayList(nameAge, nameHair).iterator();
+ final Iterator<VisibilityBindingSet> rightResults = Lists.<VisibilityBindingSet>newArrayList(visiAge, visiHair).iterator();
// Therefore, there are a few new join results that mix the two together.
- Iterator<BindingSet> newJoinResultsIt = leftOuterJoin.newLeftResult(newLeftResult, rightResults);
+ final Iterator<VisibilityBindingSet> newJoinResultsIt = leftOuterJoin.newLeftResult(newLeftResult, rightResults);
- Set<BindingSet> newJoinResults = new HashSet<>();
+ final Set<BindingSet> newJoinResults = new HashSet<>();
while(newJoinResultsIt.hasNext()) {
newJoinResults.add( newJoinResultsIt.next() );
}
- Set<BindingSet> expected = Sets.<BindingSet>newHashSet();
- MapBindingSet nameHeightAge = new MapBindingSet();
+ final Set<BindingSet> expected = Sets.<BindingSet>newHashSet();
+ final MapBindingSet nameHeightAge = new MapBindingSet();
nameHeightAge.addBinding("name", vf.createLiteral("Bob"));
nameHeightAge.addBinding("height", vf.createLiteral("5'9\""));
nameHeightAge.addBinding("age", vf.createLiteral(56));
- expected.add(nameHeightAge);
+ expected.add(new VisibilityBindingSet(nameHeightAge));
- MapBindingSet nameHeightHair = new MapBindingSet();
+ final MapBindingSet nameHeightHair = new MapBindingSet();
nameHeightHair.addBinding("name", vf.createLiteral("Bob"));
nameHeightHair.addBinding("height", vf.createLiteral("5'9\""));
nameHeightHair.addBinding("hairColor", vf.createLiteral("Brown"));
- expected.add(nameHeightHair);
+ expected.add(new VisibilityBindingSet(nameHeightHair));
assertEquals(expected, newJoinResults);
}
@Test
public void newRightResult_noLeftMatches() {
- IterativeJoin leftOuterJoin = new LeftOuterJoin();
+ final IterativeJoin leftOuterJoin = new LeftOuterJoin();
// There are no left results.
- Iterator<BindingSet> leftResults= new ArrayList<BindingSet>().iterator();
+ final Iterator<VisibilityBindingSet> leftResults= new ArrayList<VisibilityBindingSet>().iterator();
// There is a new right result.
- MapBindingSet newRightResult = new MapBindingSet();
+ final MapBindingSet newRightResult = new MapBindingSet();
newRightResult.addBinding("name", vf.createLiteral("Bob"));
// Therefore, there are no new join results.
- Iterator<BindingSet> newJoinResultsIt = leftOuterJoin.newRightResult(leftResults, newRightResult);
+ final Iterator<VisibilityBindingSet> newJoinResultsIt = leftOuterJoin.newRightResult(leftResults, new VisibilityBindingSet(newRightResult));
assertFalse( newJoinResultsIt.hasNext() );
}
@Test
public void newRightResult_joinsWithLeftResults() {
- IterativeJoin leftOuterJoin = new LeftOuterJoin();
+ final IterativeJoin leftOuterJoin = new LeftOuterJoin();
// There are a few left results that join with the new right result.
- MapBindingSet nameAge = new MapBindingSet();
+ final MapBindingSet nameAge = new MapBindingSet();
nameAge.addBinding("name", vf.createLiteral("Bob"));
nameAge.addBinding("age", vf.createLiteral(56));
- MapBindingSet nameHair = new MapBindingSet();
+ final MapBindingSet nameHair = new MapBindingSet();
nameHair.addBinding("name", vf.createLiteral("Bob"));
nameHair.addBinding("hairColor", vf.createLiteral("Brown"));
- Iterator<BindingSet> leftResults = Lists.<BindingSet>newArrayList(nameAge, nameHair).iterator();
+ final Iterator<VisibilityBindingSet> leftResults = Lists.<VisibilityBindingSet>newArrayList(
+ new VisibilityBindingSet(nameAge),
+ new VisibilityBindingSet(nameHair)).iterator();
// There is a new right result.
- MapBindingSet newRightResult = new MapBindingSet();
+ final MapBindingSet newRightResult = new MapBindingSet();
newRightResult.addBinding("name", vf.createLiteral("Bob"));
newRightResult.addBinding("height", vf.createLiteral("5'9\""));
// Therefore, there are a few new join results that mix the two together.
- Iterator<BindingSet> newJoinResultsIt = leftOuterJoin.newRightResult(leftResults, newRightResult);
+ final Iterator<VisibilityBindingSet> newJoinResultsIt = leftOuterJoin.newRightResult(leftResults, new VisibilityBindingSet(newRightResult));
- Set<BindingSet> newJoinResults = new HashSet<>();
+ final Set<BindingSet> newJoinResults = new HashSet<>();
while(newJoinResultsIt.hasNext()) {
newJoinResults.add( newJoinResultsIt.next() );
}
- Set<BindingSet> expected = Sets.<BindingSet>newHashSet();
- MapBindingSet nameHeightAge = new MapBindingSet();
+ final Set<BindingSet> expected = Sets.<BindingSet>newHashSet();
+ final MapBindingSet nameHeightAge = new MapBindingSet();
nameHeightAge.addBinding("name", vf.createLiteral("Bob"));
nameHeightAge.addBinding("height", vf.createLiteral("5'9\""));
nameHeightAge.addBinding("age", vf.createLiteral(56));
- expected.add(nameHeightAge);
+ expected.add(new VisibilityBindingSet(nameHeightAge));
- MapBindingSet nameHeightHair = new MapBindingSet();
+ final MapBindingSet nameHeightHair = new MapBindingSet();
nameHeightHair.addBinding("name", vf.createLiteral("Bob"));
nameHeightHair.addBinding("height", vf.createLiteral("5'9\""));
nameHeightHair.addBinding("hairColor", vf.createLiteral("Brown"));
- expected.add(nameHeightHair);
+ expected.add(new VisibilityBindingSet(nameHeightHair));
assertEquals(expected, newJoinResults);
}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c53b5402/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/NaturalJoinTest.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/NaturalJoinTest.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/NaturalJoinTest.java
index 15023c5..651ea11 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/NaturalJoinTest.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/NaturalJoinTest.java
@@ -37,6 +37,8 @@ import org.openrdf.query.impl.MapBindingSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
+import mvm.rya.indexing.accumulo.VisibilityBindingSet;
+
/**
* Tests the methods of {@link NaturalJoin}.
*/
@@ -46,120 +48,124 @@ public class NaturalJoinTest {
@Test
public void newLeftResult_noRightMatches() {
- IterativeJoin naturalJoin = new NaturalJoin();
+ final IterativeJoin naturalJoin = new NaturalJoin();
// There is a new left result.
- MapBindingSet newLeftResult = new MapBindingSet();
+ final MapBindingSet newLeftResult = new MapBindingSet();
newLeftResult.addBinding("name", vf.createLiteral("Bob"));
// There are no right results that join with the left result.
- Iterator<BindingSet> rightResults= new ArrayList<BindingSet>().iterator();
+ final Iterator<VisibilityBindingSet> rightResults= new ArrayList<VisibilityBindingSet>().iterator();
// Therefore, the left result is a new join result.
- Iterator<BindingSet> newJoinResultsIt = naturalJoin.newLeftResult(newLeftResult, rightResults);
+ final Iterator<VisibilityBindingSet> newJoinResultsIt = naturalJoin.newLeftResult(new VisibilityBindingSet(newLeftResult), rightResults);
assertFalse( newJoinResultsIt.hasNext() );
}
@Test
public void newLeftResult_joinsWithRightResults() {
- IterativeJoin naturalJoin = new NaturalJoin();
+ final IterativeJoin naturalJoin = new NaturalJoin();
// There is a new left result.
- MapBindingSet newLeftResult = new MapBindingSet();
+ final MapBindingSet newLeftResult = new MapBindingSet();
newLeftResult.addBinding("name", vf.createLiteral("Bob"));
newLeftResult.addBinding("height", vf.createLiteral("5'9\""));
// There are a few right results that join with the left result.
- MapBindingSet nameAge = new MapBindingSet();
+ final MapBindingSet nameAge = new MapBindingSet();
nameAge.addBinding("name", vf.createLiteral("Bob"));
nameAge.addBinding("age", vf.createLiteral(56));
- MapBindingSet nameHair = new MapBindingSet();
+ final MapBindingSet nameHair = new MapBindingSet();
nameHair.addBinding("name", vf.createLiteral("Bob"));
nameHair.addBinding("hairColor", vf.createLiteral("Brown"));
- Iterator<BindingSet> rightResults = Lists.<BindingSet>newArrayList(nameAge, nameHair).iterator();
+ final Iterator<VisibilityBindingSet> rightResults = Lists.<VisibilityBindingSet>newArrayList(
+ new VisibilityBindingSet(nameAge),
+ new VisibilityBindingSet(nameHair)).iterator();
// Therefore, there are a few new join results that mix the two together.
- Iterator<BindingSet> newJoinResultsIt = naturalJoin.newLeftResult(newLeftResult, rightResults);
+ final Iterator<VisibilityBindingSet> newJoinResultsIt = naturalJoin.newLeftResult(new VisibilityBindingSet(newLeftResult), rightResults);
- Set<BindingSet> newJoinResults = new HashSet<>();
+ final Set<BindingSet> newJoinResults = new HashSet<>();
while(newJoinResultsIt.hasNext()) {
newJoinResults.add( newJoinResultsIt.next() );
}
- Set<BindingSet> expected = Sets.<BindingSet>newHashSet();
- MapBindingSet nameHeightAge = new MapBindingSet();
+ final Set<BindingSet> expected = Sets.<BindingSet>newHashSet();
+ final MapBindingSet nameHeightAge = new MapBindingSet();
nameHeightAge.addBinding("name", vf.createLiteral("Bob"));
nameHeightAge.addBinding("height", vf.createLiteral("5'9\""));
nameHeightAge.addBinding("age", vf.createLiteral(56));
- expected.add(nameHeightAge);
+ expected.add(new VisibilityBindingSet(nameHeightAge));
- MapBindingSet nameHeightHair = new MapBindingSet();
+ final MapBindingSet nameHeightHair = new MapBindingSet();
nameHeightHair.addBinding("name", vf.createLiteral("Bob"));
nameHeightHair.addBinding("height", vf.createLiteral("5'9\""));
nameHeightHair.addBinding("hairColor", vf.createLiteral("Brown"));
- expected.add(nameHeightHair);
+ expected.add(new VisibilityBindingSet(nameHeightHair));
assertEquals(expected, newJoinResults);
}
@Test
public void newRightResult_noLeftMatches() {
- IterativeJoin naturalJoin = new NaturalJoin();
+ final IterativeJoin naturalJoin = new NaturalJoin();
// There are no left results.
- Iterator<BindingSet> leftResults= new ArrayList<BindingSet>().iterator();
+ final Iterator<VisibilityBindingSet> leftResults= new ArrayList<VisibilityBindingSet>().iterator();
// There is a new right result.
- MapBindingSet newRightResult = new MapBindingSet();
+ final MapBindingSet newRightResult = new MapBindingSet();
newRightResult.addBinding("name", vf.createLiteral("Bob"));
// Therefore, there are no new join results.
- Iterator<BindingSet> newJoinResultsIt = naturalJoin.newRightResult(leftResults, newRightResult);
+ final Iterator<VisibilityBindingSet> newJoinResultsIt = naturalJoin.newRightResult(leftResults, new VisibilityBindingSet(newRightResult));
assertFalse( newJoinResultsIt.hasNext() );
}
@Test
public void newRightResult_joinsWithLeftResults() {
- IterativeJoin naturalJoin = new NaturalJoin();
+ final IterativeJoin naturalJoin = new NaturalJoin();
// There are a few left results that join with the new right result.
- MapBindingSet nameAge = new MapBindingSet();
+ final MapBindingSet nameAge = new MapBindingSet();
nameAge.addBinding("name", vf.createLiteral("Bob"));
nameAge.addBinding("age", vf.createLiteral(56));
- MapBindingSet nameHair = new MapBindingSet();
+ final MapBindingSet nameHair = new MapBindingSet();
nameHair.addBinding("name", vf.createLiteral("Bob"));
nameHair.addBinding("hairColor", vf.createLiteral("Brown"));
- Iterator<BindingSet> leftResults = Lists.<BindingSet>newArrayList(nameAge, nameHair).iterator();
+ final Iterator<VisibilityBindingSet> leftResults = Lists.<VisibilityBindingSet>newArrayList(
+ new VisibilityBindingSet(nameAge),
+ new VisibilityBindingSet(nameHair)).iterator();
// There is a new right result.
- MapBindingSet newRightResult = new MapBindingSet();
+ final MapBindingSet newRightResult = new MapBindingSet();
newRightResult.addBinding("name", vf.createLiteral("Bob"));
newRightResult.addBinding("height", vf.createLiteral("5'9\""));
// Therefore, there are a few new join results that mix the two together.
- Iterator<BindingSet> newJoinResultsIt = naturalJoin.newRightResult(leftResults, newRightResult);
+ final Iterator<VisibilityBindingSet> newJoinResultsIt = naturalJoin.newRightResult(leftResults, new VisibilityBindingSet(newRightResult));
- Set<BindingSet> newJoinResults = new HashSet<>();
+ final Set<BindingSet> newJoinResults = new HashSet<>();
while(newJoinResultsIt.hasNext()) {
newJoinResults.add( newJoinResultsIt.next() );
}
- Set<BindingSet> expected = Sets.<BindingSet>newHashSet();
- MapBindingSet nameHeightAge = new MapBindingSet();
+ final Set<BindingSet> expected = Sets.<BindingSet>newHashSet();
+ final MapBindingSet nameHeightAge = new MapBindingSet();
nameHeightAge.addBinding("name", vf.createLiteral("Bob"));
nameHeightAge.addBinding("height", vf.createLiteral("5'9\""));
nameHeightAge.addBinding("age", vf.createLiteral(56));
- expected.add(nameHeightAge);
+ expected.add(new VisibilityBindingSet(nameHeightAge));
- MapBindingSet nameHeightHair = new MapBindingSet();
+ final MapBindingSet nameHeightHair = new MapBindingSet();
nameHeightHair.addBinding("name", vf.createLiteral("Bob"));
nameHeightHair.addBinding("height", vf.createLiteral("5'9\""));
nameHeightHair.addBinding("hairColor", vf.createLiteral("Brown"));
- expected.add(nameHeightHair);
+ expected.add(new VisibilityBindingSet(nameHeightHair));
assertEquals(expected, newJoinResults);
}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c53b5402/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/util/FluoLoader.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/util/FluoLoader.java b/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/util/FluoLoader.java
index 3168a71..7be539a 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/util/FluoLoader.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/util/FluoLoader.java
@@ -30,6 +30,8 @@ import org.openrdf.rio.RDFHandlerException;
import org.openrdf.rio.RDFParser;
import org.openrdf.rio.helpers.RDFHandlerBase;
+import com.google.common.base.Optional;
+
import io.fluo.api.client.FluoClient;
import mvm.rya.api.domain.RyaStatement;
import mvm.rya.api.resolver.RdfToRyaConversions;
@@ -68,7 +70,7 @@ public class FluoLoader extends RDFHandlerBase {
// If the buffer is full, flush it to the Fluo table.
if(buff.size() == FLUSH_SIZE) {
log.trace("Flushing " + buff.size() + " Statements from the buffer to Fluo.");
- insertTriples.insert(fluoClient, buff);
+ insertTriples.insert(fluoClient, buff, Optional.<String>absent());
buff.clear();
}
@@ -83,7 +85,7 @@ public class FluoLoader extends RDFHandlerBase {
if(!buff.isEmpty()) {
log.trace("Flushing the last " + buff.size() + " Statements from the buffer to Fluo.");
- insertTriples.insert(fluoClient, buff);
+ insertTriples.insert(fluoClient, buff, Optional.<String>absent());
buff.clear();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c53b5402/extras/rya.pcj.fluo/pcj.fluo.demo/src/main/java/org/apache/rya/indexing/pcj/fluo/demo/FluoAndHistoricPcjsDemo.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.demo/src/main/java/org/apache/rya/indexing/pcj/fluo/demo/FluoAndHistoricPcjsDemo.java b/extras/rya.pcj.fluo/pcj.fluo.demo/src/main/java/org/apache/rya/indexing/pcj/fluo/demo/FluoAndHistoricPcjsDemo.java
index 0cbfa9a..eb7fb17 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.demo/src/main/java/org/apache/rya/indexing/pcj/fluo/demo/FluoAndHistoricPcjsDemo.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.demo/src/main/java/org/apache/rya/indexing/pcj/fluo/demo/FluoAndHistoricPcjsDemo.java
@@ -48,6 +48,7 @@ import org.openrdf.repository.RepositoryConnection;
import org.openrdf.repository.RepositoryException;
import org.openrdf.sail.SailException;
+import com.google.common.base.Optional;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
@@ -60,7 +61,6 @@ import mvm.rya.api.domain.RyaStatement;
import mvm.rya.api.domain.RyaType;
import mvm.rya.api.domain.RyaURI;
import mvm.rya.api.resolver.RyaToRdfConversions;
-import mvm.rya.api.resolver.RyaTypeResolverException;
import mvm.rya.indexing.external.tupleSet.AccumuloPcjSerializer;
import mvm.rya.indexing.external.tupleSet.BindingSetConverter.BindingSetConversionException;
import mvm.rya.indexing.external.tupleSet.PcjTables;
@@ -77,7 +77,7 @@ public class FluoAndHistoricPcjsDemo implements Demo {
private static final Logger log = Logger.getLogger(FluoAndHistoricPcjsDemo.class);
private static final AccumuloPcjSerializer converter = new AccumuloPcjSerializer();
-
+
// Employees
private static final RyaURI alice = new RyaURI("http://Alice");
private static final RyaURI bob = new RyaURI("http://Bob");
@@ -290,7 +290,7 @@ public class FluoAndHistoricPcjsDemo implements Demo {
private static void loadDataIntoFluo(final FluoClient fluoClient, final Set<RyaStatement> statements) {
final InsertTriples insertTriples = new InsertTriples();
for(final RyaStatement statement : statements) {
- insertTriples.insert(fluoClient, statement);
+ insertTriples.insert(fluoClient, statement, Optional.<String>absent());
}
}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c53b5402/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/CountStatementsIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/CountStatementsIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/CountStatementsIT.java
index 41c4f08..566d2d2 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/CountStatementsIT.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/CountStatementsIT.java
@@ -28,6 +28,7 @@ import java.util.List;
import org.apache.rya.indexing.pcj.fluo.ITBase;
import org.junit.Test;
+import com.google.common.base.Optional;
import com.google.common.io.Files;
import io.fluo.api.client.FluoFactory;
@@ -75,7 +76,7 @@ public class CountStatementsIT extends ITBase {
triples.add( RyaStatement.builder().setSubject(new RyaURI("http://David")).setPredicate(new RyaURI("http://talksTo")).setObject(new RyaURI("http://Bob")).build() );
triples.add( RyaStatement.builder().setSubject(new RyaURI("http://Eve")).setPredicate(new RyaURI("http://talksTo")).setObject(new RyaURI("http://Bob")).build() );
- new InsertTriples().insert(fluoClient, triples);
+ new InsertTriples().insert(fluoClient, triples, Optional.<String>absent());
// Load some statements into the Fluo app.
final BigInteger count = new CountStatements().countStatements(fluoClient);
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c53b5402/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetQueryReportIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetQueryReportIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetQueryReportIT.java
index 157412a..0e766b1 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetQueryReportIT.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetQueryReportIT.java
@@ -33,6 +33,7 @@ import org.apache.rya.indexing.pcj.fluo.app.query.FluoQuery;
import org.apache.rya.indexing.pcj.fluo.app.query.StatementPatternMetadata;
import org.junit.Test;
+import com.google.common.base.Optional;
import com.google.common.collect.Sets;
import mvm.rya.api.domain.RyaStatement;
@@ -77,7 +78,7 @@ public class GetQueryReportIT extends ITBase {
new CreatePcj().withRyaIntegration(fluoClient, RYA_TABLE_PREFIX, ryaRepo, accumuloConn, new HashSet<VariableOrder>(), sparql);
// Stream the data into Fluo.
- new InsertTriples().insert(fluoClient, streamedTriples);
+ new InsertTriples().insert(fluoClient, streamedTriples, Optional.<String>absent());
// Wait for the results to finish processing.
fluo.waitForObservers();
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c53b5402/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/InputIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/InputIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/InputIT.java
index 41c2d7d..6e633f6 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/InputIT.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/InputIT.java
@@ -33,6 +33,7 @@ import org.openrdf.model.impl.URIImpl;
import org.openrdf.query.BindingSet;
import org.openrdf.query.impl.BindingImpl;
+import com.google.common.base.Optional;
import com.google.common.collect.Sets;
import mvm.rya.api.domain.RyaStatement;
@@ -135,7 +136,7 @@ public class InputIT extends ITBase {
assertTrue( results.isEmpty() );
// Stream the data into Fluo.
- new InsertTriples().insert(fluoClient, streamedTriples);
+ new InsertTriples().insert(fluoClient, streamedTriples, Optional.<String>absent());
// Verify the end results of the query match the expected results.
fluo.waitForObservers();
@@ -187,7 +188,7 @@ public class InputIT extends ITBase {
assertEquals(expected, results);
// Stream the data into Fluo.
- new InsertTriples().insert(fluoClient, streamedTriples);
+ new InsertTriples().insert(fluoClient, streamedTriples, Optional.<String>absent());
// Verify the end results of the query also include Frank.
fluo.waitForObservers();
@@ -244,7 +245,7 @@ public class InputIT extends ITBase {
assertEquals(expected, results);
// Stream the same Alice triple into Fluo.
- new InsertTriples().insert(fluoClient, streamedTriples);
+ new InsertTriples().insert(fluoClient, streamedTriples, Optional.<String>absent());
// Verify the end results of the query is stiill only Alice.
fluo.waitForObservers();
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c53b5402/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/QueryIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/QueryIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/QueryIT.java
index 46db8cd..f408a1c 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/QueryIT.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/QueryIT.java
@@ -33,6 +33,7 @@ import org.openrdf.model.vocabulary.XMLSchema;
import org.openrdf.query.BindingSet;
import org.openrdf.query.impl.BindingImpl;
+import com.google.common.base.Optional;
import com.google.common.collect.Sets;
import mvm.rya.api.domain.RyaStatement;
@@ -81,7 +82,7 @@ public class QueryIT extends ITBase {
new CreatePcj().withRyaIntegration(fluoClient, RYA_TABLE_PREFIX, ryaRepo, accumuloConn, new HashSet<VariableOrder>(), sparql);
// Stream the data into Fluo.
- new InsertTriples().insert(fluoClient, streamedTriples);
+ new InsertTriples().insert(fluoClient, streamedTriples, Optional.<String>absent());
// Verify the end results of the query match the expected results.
fluo.waitForObservers();
@@ -163,7 +164,7 @@ public class QueryIT extends ITBase {
new CreatePcj().withRyaIntegration(fluoClient, RYA_TABLE_PREFIX, ryaRepo, accumuloConn, new HashSet<VariableOrder>(), sparql);
// Stream the data into Fluo.
- new InsertTriples().insert(fluoClient, streamedTriples);
+ new InsertTriples().insert(fluoClient, streamedTriples, Optional.<String>absent());
// Verify the end results of the query match the expected results.
fluo.waitForObservers();
@@ -224,7 +225,7 @@ public class QueryIT extends ITBase {
new CreatePcj().withRyaIntegration(fluoClient, RYA_TABLE_PREFIX, ryaRepo, accumuloConn, new HashSet<VariableOrder>(), sparql);
// Stream the data into Fluo.
- new InsertTriples().insert(fluoClient, streamedTriples);
+ new InsertTriples().insert(fluoClient, streamedTriples, Optional.<String>absent());
// Verify the end results of the query match the expected results.
fluo.waitForObservers();
@@ -268,7 +269,7 @@ public class QueryIT extends ITBase {
new CreatePcj().withRyaIntegration(fluoClient, RYA_TABLE_PREFIX, ryaRepo, accumuloConn, new HashSet<VariableOrder>(), sparql);
// Stream the data into Fluo.
- new InsertTriples().insert(fluoClient, streamedTriples);
+ new InsertTriples().insert(fluoClient, streamedTriples, Optional.<String>absent());
// Verify the end results of the query match the expected results.
fluo.waitForObservers();
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c53b5402/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaExportIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaExportIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaExportIT.java
index ee3fffd..b75e624 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaExportIT.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaExportIT.java
@@ -43,6 +43,7 @@ import org.openrdf.model.impl.URIImpl;
import org.openrdf.query.BindingSet;
import org.openrdf.query.impl.BindingImpl;
+import com.google.common.base.Optional;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
@@ -50,7 +51,6 @@ import com.google.common.collect.Sets;
import io.fluo.api.client.Snapshot;
import io.fluo.api.data.Bytes;
import mvm.rya.api.domain.RyaStatement;
-import mvm.rya.api.resolver.RyaTypeResolverException;
import mvm.rya.indexing.external.tupleSet.AccumuloPcjSerializer;
import mvm.rya.indexing.external.tupleSet.BindingSetConverter.BindingSetConversionException;
import mvm.rya.indexing.external.tupleSet.PcjTables;
@@ -66,7 +66,7 @@ import mvm.rya.indexing.external.tupleSet.PcjTables.VariableOrder;
public class RyaExportIT extends ITBase {
private static final AccumuloPcjSerializer converter = new AccumuloPcjSerializer();
-
+
/**
* Configure the export observer to use the Mini Accumulo instance as the
* export destination for new PCJ results.
@@ -138,7 +138,7 @@ public class RyaExportIT extends ITBase {
new CreatePcj().withRyaIntegration(fluoClient, RYA_TABLE_PREFIX, ryaRepo, accumuloConn, new HashSet<VariableOrder>(), sparql);
// Stream the data into Fluo.
- new InsertTriples().insert(fluoClient, streamedTriples);
+ new InsertTriples().insert(fluoClient, streamedTriples, Optional.<String>absent());
// Fetch the exported results from Accumulo once the observers finish working.
fluo.waitForObservers();