You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rya.apache.org by ca...@apache.org on 2017/08/19 00:05:41 UTC
[1/2] incubator-rya git commit: RYA-283-Batch-Observer-Integration.
Closes #198.
Repository: incubator-rya
Updated Branches:
refs/heads/master e387818ba -> ad60aca8d
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/ad60aca8/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/KafkaExportIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/KafkaExportIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/KafkaExportIT.java
index 3ee07a7..f9f55d0 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/KafkaExportIT.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/KafkaExportIT.java
@@ -428,7 +428,6 @@ public class KafkaExportIT extends KafkaExportITBase {
// Verify the end results of the query match the expected results.
final Set<VisibilityBindingSet> results = readGroupedResults(pcjId, new VariableOrder("type", "location"));
- System.out.println(results);
assertEquals(expectedResults, results);
}
@@ -501,7 +500,6 @@ public class KafkaExportIT extends KafkaExportITBase {
// Verify the end results of the query match the expected results.
final Set<VisibilityBindingSet> results = readGroupedResults(pcjId, new VariableOrder("type", "location"));
- System.out.println(results);
assertEquals(expectedResults, results);
}
@@ -582,7 +580,6 @@ public class KafkaExportIT extends KafkaExportITBase {
// Verify the end results of the query match the expected results.
final Set<VisibilityBindingSet> results = readGroupedResults(pcjId, new VariableOrder("type", "location"));
- System.out.println(results);
assertEquals(expectedResults, results);
}
[2/2] incubator-rya git commit: RYA-283-Batch-Observer-Integration.
Closes #198.
Posted by ca...@apache.org.
RYA-283-Batch-Observer-Integration. Closes #198.
Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/ad60aca8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/ad60aca8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/ad60aca8
Branch: refs/heads/master
Commit: ad60aca8d8a1002b528aeaa4376e86925dd0b2e0
Parents: e387818
Author: Caleb Meier <ca...@parsons.com>
Authored: Sat Aug 5 16:53:49 2017 -0700
Committer: Caleb Meier <ca...@parsons.com>
Committed: Fri Aug 18 17:04:45 2017 -0700
----------------------------------------------------------------------
.../indexing/pcj/fluo/api/CreateFluoPcj.java | 17 +-
.../pcj/fluo/app/JoinResultUpdater.java | 146 ++++---
.../batch/AbstractBatchBindingSetUpdater.java | 2 +-
.../fluo/app/batch/JoinBatchInformation.java | 36 +-
.../JoinBatchInformationTypeAdapter.java | 4 +-
.../pcj/fluo/app/query/FluoQueryColumns.java | 3 +
.../fluo/app/query/FluoQueryMetadataDAO.java | 4 +
.../pcj/fluo/app/query/JoinMetadata.java | 34 +-
.../pcj/fluo/app/query/QueryMetadata.java | 22 +
.../fluo/app/query/SparqlFluoQueryBuilder.java | 12 +
.../pcj/fluo/app/util/FluoQueryUtils.java | 12 +
.../pcj/fluo/app/util/NodeIdCollector.java | 92 ++++
.../BatchInformationSerializerTest.java | 5 +-
.../fluo/app/query/QueryBuilderVisitorTest.java | 4 -
.../pcj/fluo/integration/BatchDeleteIT.java | 316 --------------
.../indexing/pcj/fluo/integration/BatchIT.java | 424 +++++++++++++++++++
.../pcj/fluo/integration/KafkaExportIT.java | 3 -
17 files changed, 702 insertions(+), 434 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/ad60aca8/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreateFluoPcj.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreateFluoPcj.java b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreateFluoPcj.java
index e450960..150a256 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreateFluoPcj.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreateFluoPcj.java
@@ -47,9 +47,7 @@ import org.apache.rya.api.resolver.RdfToRyaConversions;
import org.apache.rya.indexing.pcj.fluo.app.FluoStringConverter;
import org.apache.rya.indexing.pcj.fluo.app.NodeType;
import org.apache.rya.indexing.pcj.fluo.app.query.FluoQuery;
-import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO;
-import org.apache.rya.indexing.pcj.fluo.app.query.QueryMetadata;
import org.apache.rya.indexing.pcj.fluo.app.query.SparqlFluoQueryBuilder;
import org.apache.rya.indexing.pcj.fluo.app.query.StatementPatternMetadata;
import org.apache.rya.indexing.pcj.storage.PcjException;
@@ -90,6 +88,10 @@ public class CreateFluoPcj {
* The default Statement Pattern batch insert size is 1000.
*/
private static final int DEFAULT_SP_INSERT_BATCH_SIZE = 1000;
+ /**
+ * The default Join batch size is 5000.
+ */
+ private static final int DEFAULT_JOIN_BATCH_SIZE = 5000;
/**
* The maximum number of binding sets that will be inserted into each Statement
@@ -98,11 +100,15 @@ public class CreateFluoPcj {
private final int spInsertBatchSize;
/**
+ * The maximum number of join results that will be processed per transaction.
+ */
+ private final int joinBatchSize;
+ /**
* Constructs an instance of {@link CreateFluoPcj} that uses
* {@link #DEFAULT_SP_INSERT_BATCH_SIZE} as the default batch insert size.
*/
public CreateFluoPcj() {
- this(DEFAULT_SP_INSERT_BATCH_SIZE);
+ this(DEFAULT_SP_INSERT_BATCH_SIZE, DEFAULT_JOIN_BATCH_SIZE);
}
/**
@@ -111,9 +117,11 @@ public class CreateFluoPcj {
* @param spInsertBatchSize - The maximum number of binding sets that will be
* inserted into each Statement Pattern's result set per Fluo transaction.
*/
- public CreateFluoPcj(final int spInsertBatchSize) {
+ public CreateFluoPcj(final int spInsertBatchSize, final int joinBatchSize) {
checkArgument(spInsertBatchSize > 0, "The SP insert batch size '" + spInsertBatchSize + "' must be greater than 0.");
+ checkArgument(joinBatchSize > 0, "The Join batch size '" + joinBatchSize + "' must be greater than 0.");
this.spInsertBatchSize = spInsertBatchSize;
+ this.joinBatchSize = joinBatchSize;
}
@@ -173,6 +181,7 @@ public class CreateFluoPcj {
SparqlFluoQueryBuilder builder = new SparqlFluoQueryBuilder();
builder.setFluoQueryId(queryId);
builder.setSparql(sparql);
+ builder.setJoinBatchSize(joinBatchSize);
return builder.build();
}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/ad60aca8/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/JoinResultUpdater.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/JoinResultUpdater.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/JoinResultUpdater.java
index 0f448a6..fb3ee0c 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/JoinResultUpdater.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/JoinResultUpdater.java
@@ -23,6 +23,7 @@ import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.DE
import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.NODEID_BS_DELIM;
import java.util.ArrayList;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
@@ -33,14 +34,18 @@ import org.apache.fluo.api.client.scanner.RowScanner;
import org.apache.fluo.api.data.Bytes;
import org.apache.fluo.api.data.Column;
import org.apache.fluo.api.data.ColumnValue;
+import org.apache.fluo.api.data.RowColumn;
import org.apache.fluo.api.data.Span;
import org.apache.log4j.Logger;
import org.apache.rya.accumulo.utils.VisibilitySimplifier;
+import org.apache.rya.indexing.pcj.fluo.app.batch.AbstractBatchBindingSetUpdater;
+import org.apache.rya.indexing.pcj.fluo.app.batch.BatchInformation.Task;
+import org.apache.rya.indexing.pcj.fluo.app.batch.BatchInformationDAO;
+import org.apache.rya.indexing.pcj.fluo.app.batch.JoinBatchInformation;
import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO;
import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata;
import org.apache.rya.indexing.pcj.fluo.app.util.RowKeyUtil;
-import org.apache.rya.indexing.pcj.storage.accumulo.BindingSetConverter.BindingSetConversionException;
import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSetSerDe;
@@ -51,7 +56,6 @@ import org.openrdf.query.impl.MapBindingSet;
import com.google.common.base.Optional;
import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
import edu.umd.cs.findbugs.annotations.NonNull;
@@ -122,14 +126,17 @@ public class JoinResultUpdater {
}
// Iterates over the sibling node's BindingSets that join with the new binding set.
- final FluoTableIterator siblingBindingSets = makeSiblingScanIterator(childNodeId, childBindingSet, siblingId, tx);
-
+ Set<VisibilityBindingSet> siblingBindingSets = new HashSet<>();
+ Span siblingSpan = getSpan(tx, childNodeId, childBindingSet, siblingId);
+ Column siblingColumn = getScanColumnFamily(siblingId);
+ Optional<RowColumn> rowColumn = fillSiblingBatch(tx, siblingSpan, siblingColumn, siblingBindingSets, joinMetadata.getJoinBatchSize());
+
// Iterates over the resulting BindingSets from the join.
final Iterator<VisibilityBindingSet> newJoinResults;
if(emittingSide == Side.LEFT) {
- newJoinResults = joinAlgorithm.newLeftResult(childBindingSet, siblingBindingSets);
+ newJoinResults = joinAlgorithm.newLeftResult(childBindingSet, siblingBindingSets.iterator());
} else {
- newJoinResults = joinAlgorithm.newRightResult(siblingBindingSets, childBindingSet);
+ newJoinResults = joinAlgorithm.newRightResult(siblingBindingSets.iterator(), childBindingSet);
}
// Insert the new join binding sets to the Fluo table.
@@ -152,6 +159,22 @@ public class JoinResultUpdater {
tx.set(resultRow, FluoQueryColumns.JOIN_BINDING_SET, nodeValueBytes);
}
}
+
+ // if batch limit met, there are additional entries to process
+ // update the span and register updated batch job
+ if (rowColumn.isPresent()) {
+ Span newSpan = AbstractBatchBindingSetUpdater.getNewSpan(rowColumn.get(), siblingSpan);
+ JoinBatchInformation joinBatch = JoinBatchInformation.builder()
+ .setBatchSize(joinMetadata.getJoinBatchSize())
+ .setBs(childBindingSet)
+ .setColumn(siblingColumn)
+ .setJoinType(joinMetadata.getJoinType())
+ .setSide(emittingSide)
+ .setSpan(newSpan)
+ .setTask(Task.Add)
+ .build();
+ BatchInformationDAO.addBatch(tx, joinMetadata.getNodeId(), joinBatch);
+ }
}
/**
@@ -160,8 +183,55 @@ public class JoinResultUpdater {
public static enum Side {
LEFT, RIGHT;
}
+
+
+ /**
+ * Fetches batch to be processed by scanning over the Span specified by the
+ * {@link JoinBatchInformation}. The number of results is less than or equal
+ * to the batch size specified by the JoinBatchInformation.
+ *
+ * @param tx - Fluo transaction in which batch operation is performed
+ * @param siblingSpan - span of sibling to retrieve elements to join with
+ * @param bsSet- set that batch results are added to
+ * @return Set - containing results of sibling scan.
+ * @throws Exception
+ */
+ private Optional<RowColumn> fillSiblingBatch(TransactionBase tx, Span siblingSpan, Column siblingColumn, Set<VisibilityBindingSet> bsSet, int batchSize) throws Exception {
+
+ RowScanner rs = tx.scanner().over(siblingSpan).fetch(siblingColumn).byRow().build();
+ Iterator<ColumnScanner> colScannerIter = rs.iterator();
+
+ boolean batchLimitMet = false;
+ Bytes row = siblingSpan.getStart().getRow();
+ while (colScannerIter.hasNext() && !batchLimitMet) {
+ ColumnScanner colScanner = colScannerIter.next();
+ row = colScanner.getRow();
+ Iterator<ColumnValue> iter = colScanner.iterator();
+ while (iter.hasNext() && !batchLimitMet) {
+ bsSet.add(BS_SERDE.deserialize(iter.next().getValue()));
+ //check if batch size has been met and set flag if it has been met
+ if (bsSet.size() >= batchSize) {
+ batchLimitMet = true;
+ }
+ }
+ }
- private FluoTableIterator makeSiblingScanIterator(final String childId, final BindingSet childBindingSet, final String siblingId, final TransactionBase tx) throws BindingSetConversionException {
+ if (batchLimitMet) {
+ return Optional.of(new RowColumn(row, siblingColumn));
+ } else {
+ return Optional.absent();
+ }
+ }
+
+ /**
+ * Creates a Span for the sibling node to retrieve BindingSets to join with
+ * @param tx
+ * @param childId - Id of the node that was updated
+ * @param childBindingSet - BindingSet update
+ * @param siblingId - Id of the sibling node whose BindingSets will be retrieved and joined with the update
+ * @return Span to retrieve sibling node's BindingSets to form join results
+ */
+ private Span getSpan(TransactionBase tx, final String childId, final BindingSet childBindingSet, final String siblingId) {
// Get the common variable orders. These are used to build the prefix.
final VariableOrder childVarOrder = getVarOrder(tx, childId);
final VariableOrder siblingVarOrder = getVarOrder(tx, siblingId);
@@ -184,15 +254,7 @@ public class JoinResultUpdater {
}
}
siblingScanPrefix = siblingId + NODEID_BS_DELIM + siblingScanPrefix;
-
- // Scan the sibling node's binding sets for those that have the same
- // common variable values as childBindingSet. These needs to be joined
- // and inserted into the Join's results. It's possible that none of these
- // results will be new Join results if they have already been created in
- // earlier iterations of this algorithm.
-
- final RowScanner rs = tx.scanner().over(Span.prefix(siblingScanPrefix)).fetch(getScanColumnFamily(siblingId)).byRow().build();
- return new FluoTableIterator(rs);
+ return Span.prefix(siblingScanPrefix);
}
@@ -468,56 +530,4 @@ public class JoinResultUpdater {
}
}
- /**
- * Iterates over rows that have a Binding Set column and returns the unmarshalled
- * {@link BindingSet}s.
- */
- private static final class FluoTableIterator implements Iterator<VisibilityBindingSet> {
-
- private static final VisibilityBindingSetSerDe BS_SERDE = new VisibilityBindingSetSerDe();
-
- private static final Set<Column> BINDING_SET_COLUMNS = Sets.newHashSet(
- FluoQueryColumns.STATEMENT_PATTERN_BINDING_SET,
- FluoQueryColumns.JOIN_BINDING_SET,
- FluoQueryColumns.FILTER_BINDING_SET);
-
- private final Iterator<ColumnScanner> rows;
-
- /**
- * Constructs an instance of {@link FluoTableIterator}.
- *
- * @param rows - Iterates over RowId values in a Fluo Table. (not null)
- */
- public FluoTableIterator(final RowScanner rows) {
- this.rows = checkNotNull(rows).iterator();
- }
-
- @Override
- public boolean hasNext() {
- return rows.hasNext();
- }
-
- @Override
- public VisibilityBindingSet next() {
- final ColumnScanner columns = rows.next();
-
- for (final ColumnValue cv : columns) {
- if(BINDING_SET_COLUMNS.contains(cv.getColumn())) {
- final Bytes value = cv.getValue();
- try {
- return BS_SERDE.deserialize(value);
- } catch (final Exception e) {
- throw new RuntimeException("Row did not containing a Binding Set.", e);
- }
- }
- }
-
- throw new RuntimeException("Row did not containing a Binding Set.");
- }
-
- @Override
- public void remove() {
- throw new UnsupportedOperationException("remove() is unsupported.");
- }
- }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/ad60aca8/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/AbstractBatchBindingSetUpdater.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/AbstractBatchBindingSetUpdater.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/AbstractBatchBindingSetUpdater.java
index db33d3b..9584a10 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/AbstractBatchBindingSetUpdater.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/AbstractBatchBindingSetUpdater.java
@@ -38,7 +38,7 @@ public abstract class AbstractBatchBindingSetUpdater implements BatchBindingSetU
* @param oldSpan - old Span to be updated with newStart
* @return - updated Span used with an updated BatchInformation object to complete the batch task
*/
- public Span getNewSpan(RowColumn newStart, Span oldSpan) {
+ public static Span getNewSpan(RowColumn newStart, Span oldSpan) {
return new Span(newStart, oldSpan.isStartInclusive(), oldSpan.getEnd(), oldSpan.isEndInclusive());
}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/ad60aca8/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/JoinBatchInformation.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/JoinBatchInformation.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/JoinBatchInformation.java
index 71ac557..d049ff0 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/JoinBatchInformation.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/JoinBatchInformation.java
@@ -37,8 +37,7 @@ import jline.internal.Preconditions;
* VariableOrder are specified. This is so that the sibling node (the node that
* wasn't updated) can be scanned to obtain results that can be joined with the
* VisibilityBindingSet. The assumption here is that the Span is derived from
- * the {@link Binding}s of common variables between the join children, with
- * Values ordered according to the indicated {@link VariableOrder}. This class
+ * the {@link Binding}s of common variables between the join children. This class
* represents a batch order to perform a given task on join BindingSet results.
* The {@link Task} is to Add, Delete, or Update. This batch order is processed
* by the {@link BatchObserver} and used with the nodeId provided to the
@@ -54,7 +53,6 @@ public class JoinBatchInformation extends AbstractSpanBatchInformation {
private static final BatchBindingSetUpdater updater = new JoinBatchBindingSetUpdater();
private VisibilityBindingSet bs; //update for join child indicated by side
- private VariableOrder varOrder; //variable order for child indicated by Span
private Side side; //join child that was updated by bs
private JoinType join;
/**
@@ -63,20 +61,18 @@ public class JoinBatchInformation extends AbstractSpanBatchInformation {
* @param column - Column of join child to be scanned
* @param span - span of join child to be scanned (derived from common variables of left and right join children)
* @param bs - BindingSet to be joined with results of child scan
- * @param varOrder - VariableOrder used to form join (order for join child corresponding to Span)
* @param side - The side of the child that the VisibilityBindingSet update occurred at
* @param join - JoinType (left, right, natural inner)
*/
- public JoinBatchInformation(int batchSize, Task task, Column column, Span span, VisibilityBindingSet bs, VariableOrder varOrder, Side side, JoinType join) {
+ public JoinBatchInformation(int batchSize, Task task, Column column, Span span, VisibilityBindingSet bs, Side side, JoinType join) {
super(batchSize, task, column, span);
this.bs = Preconditions.checkNotNull(bs);
- this.varOrder = Preconditions.checkNotNull(varOrder);
this.side = Preconditions.checkNotNull(side);
this.join = Preconditions.checkNotNull(join);
}
- public JoinBatchInformation(Task task, Column column, Span span, VisibilityBindingSet bs, VariableOrder varOrder, Side side, JoinType join) {
- this(DEFAULT_BATCH_SIZE, task, column, span, bs, varOrder, side, join);
+ public JoinBatchInformation(Task task, Column column, Span span, VisibilityBindingSet bs, Side side, JoinType join) {
+ this(DEFAULT_BATCH_SIZE, task, column, span, bs, side, join);
}
/**
@@ -95,13 +91,6 @@ public class JoinBatchInformation extends AbstractSpanBatchInformation {
return join;
}
- /**
- * Returns the VariableOrder for the join child corresponding to the Span.
- * @return {@link VariableOrder} used to join {@link VisibilityBindingSet}s.
- */
- public VariableOrder getVarOrder() {
- return varOrder;
- }
/**
* Sets the VisibilityBindingSet that represents an update to the join child. The join child
@@ -129,7 +118,6 @@ public class JoinBatchInformation extends AbstractSpanBatchInformation {
.append(" Batch Size: " + super.getBatchSize() + "\n")
.append(" Task: " + super.getTask() + "\n")
.append(" Column: " + super.getColumn() + "\n")
- .append(" VariableOrder: " + varOrder + "\n")
.append(" Join Type: " + join + "\n")
.append(" Join Side: " + side + "\n")
.append(" Binding Set: " + bs + "\n")
@@ -149,12 +137,12 @@ public class JoinBatchInformation extends AbstractSpanBatchInformation {
JoinBatchInformation batch = (JoinBatchInformation) other;
return super.equals(other) && Objects.equals(this.bs, batch.bs) && Objects.equals(this.join, batch.join)
- && Objects.equals(this.side, batch.side) && Objects.equals(this.varOrder, batch.varOrder);
+ && Objects.equals(this.side, batch.side);
}
@Override
public int hashCode() {
- return Objects.hash(super.getBatchSize(), super.getColumn(), super.getSpan(), super.getTask(), bs, join, side, varOrder);
+ return Objects.hash(super.getBatchSize(), super.getColumn(), super.getSpan(), super.getTask(), bs, join, side);
}
@@ -169,7 +157,6 @@ public class JoinBatchInformation extends AbstractSpanBatchInformation {
private Column column;
private Span span;
private VisibilityBindingSet bs;
- private VariableOrder varOrder;
private JoinType join;
private Side side;
@@ -237,19 +224,10 @@ public class JoinBatchInformation extends AbstractSpanBatchInformation {
}
/**
- * Sets the variable order for the join child corresponding to the Span
- * @param varOrder - Variable order used to join BindingSet with result of scan
- */
- public Builder setVarOrder(VariableOrder varOrder) {
- this.varOrder = varOrder;
- return this;
- }
-
- /**
* @return an instance of {@link JoinBatchInformation} constructed from the parameters passed to this Builder
*/
public JoinBatchInformation build() {
- return new JoinBatchInformation(batchSize, task, column, span, bs, varOrder, side, join);
+ return new JoinBatchInformation(batchSize, task, column, span, bs, side, join);
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/ad60aca8/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/serializer/JoinBatchInformationTypeAdapter.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/serializer/JoinBatchInformationTypeAdapter.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/serializer/JoinBatchInformationTypeAdapter.java
index 9f3f1a6..f42a2ba 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/serializer/JoinBatchInformationTypeAdapter.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/serializer/JoinBatchInformationTypeAdapter.java
@@ -60,7 +60,6 @@ public class JoinBatchInformationTypeAdapter implements JsonSerializer<JoinBatch
result.add("span", new JsonPrimitive(span.getStart().getsRow() + "\u0000" + span.getEnd().getsRow()));
result.add("startInc", new JsonPrimitive(span.isStartInclusive()));
result.add("endInc", new JsonPrimitive(span.isEndInclusive()));
- result.add("varOrder", new JsonPrimitive(Joiner.on(";").join(batch.getVarOrder().getVariableOrders())));
result.add("side", new JsonPrimitive(batch.getSide().name()));
result.add("joinType", new JsonPrimitive(batch.getJoinType().name()));
String updateVarOrderString = Joiner.on(";").join(batch.getBs().getBindingNames());
@@ -82,12 +81,11 @@ public class JoinBatchInformationTypeAdapter implements JsonSerializer<JoinBatch
boolean startInc = json.get("startInc").getAsBoolean();
boolean endInc = json.get("endInc").getAsBoolean();
Span span = new Span(new RowColumn(rows[0]), startInc, new RowColumn(rows[1]), endInc);
- VariableOrder varOrder = new VariableOrder(json.get("varOrder").getAsString());
VariableOrder updateVarOrder = new VariableOrder(json.get("updateVarOrder").getAsString());
VisibilityBindingSet bs = converter.convert(json.get("bindingSet").getAsString(), updateVarOrder);
Side side = Side.valueOf(json.get("side").getAsString());
JoinType join = JoinType.valueOf(json.get("joinType").getAsString());
- return JoinBatchInformation.builder().setBatchSize(batchSize).setTask(task).setSpan(span).setColumn(column).setBs(bs).setVarOrder(varOrder)
+ return JoinBatchInformation.builder().setBatchSize(batchSize).setTask(task).setSpan(span).setColumn(column).setBs(bs)
.setSide(side).setJoinType(join).build();
}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/ad60aca8/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryColumns.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryColumns.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryColumns.java
index 8cd25d0..2eae4ff 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryColumns.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryColumns.java
@@ -108,6 +108,7 @@ import edu.umd.cs.findbugs.annotations.NonNull;
* <tr> <td>Node ID</td> <td>joinMetadata:parentNodeId</td> <td>The Node ID this join emits Binding Sets to.</td> </tr>
* <tr> <td>Node ID</td> <td>joinMetadata:leftChildNodeId</td> <td>A Node ID of the node that feeds this node Binding Sets.</td> </tr>
* <tr> <td>Node ID</td> <td>joinMetadata:rightChildNodeId</td> <td>A Node ID of the node that feeds this node Binding Sets.</td> </tr>
+ * <tr> <td>Node ID</td> <td>joinMetadata:joinBatchSize</td> <td>Batch size used for processing joins</td> </tr>
* <tr> <td>Node ID + DELIM + Binding Set String</td> <td>joinMetadata:bindingSet</td> <td>A {@link VisibilityBindingSet} object.</td> </tr>
* </table>
* </p>
@@ -238,6 +239,7 @@ public class FluoQueryColumns {
public static final Column JOIN_PARENT_NODE_ID = new Column(JOIN_METADATA_CF, "parentNodeId");
public static final Column JOIN_LEFT_CHILD_NODE_ID = new Column(JOIN_METADATA_CF, "leftChildNodeId");
public static final Column JOIN_RIGHT_CHILD_NODE_ID = new Column(JOIN_METADATA_CF, "rightChildNodeId");
+ public static final Column JOIN_BATCH_SIZE = new Column(JOIN_METADATA_CF, "joinBatchSize");
public static final Column JOIN_BINDING_SET = new Column(JOIN_METADATA_CF, "bindingSet");
// Statement Pattern Metadata columns.
@@ -340,6 +342,7 @@ public class FluoQueryColumns {
JOIN_TYPE,
JOIN_PARENT_NODE_ID,
JOIN_LEFT_CHILD_NODE_ID,
+ JOIN_BATCH_SIZE,
JOIN_RIGHT_CHILD_NODE_ID)),
/**
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/ad60aca8/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java
index 5ba7383..1c34836 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java
@@ -385,6 +385,7 @@ public class FluoQueryMetadataDAO {
tx.set(rowId, FluoQueryColumns.JOIN_TYPE, metadata.getJoinType().toString() );
tx.set(rowId, FluoQueryColumns.JOIN_PARENT_NODE_ID, metadata.getParentNodeId() );
tx.set(rowId, FluoQueryColumns.JOIN_LEFT_CHILD_NODE_ID, metadata.getLeftChildNodeId() );
+ tx.set(rowId, FluoQueryColumns.JOIN_BATCH_SIZE, Integer.toString(metadata.getJoinBatchSize()));
tx.set(rowId, FluoQueryColumns.JOIN_RIGHT_CHILD_NODE_ID, metadata.getRightChildNodeId() );
}
@@ -410,6 +411,7 @@ public class FluoQueryMetadataDAO {
FluoQueryColumns.JOIN_TYPE,
FluoQueryColumns.JOIN_PARENT_NODE_ID,
FluoQueryColumns.JOIN_LEFT_CHILD_NODE_ID,
+ FluoQueryColumns.JOIN_BATCH_SIZE,
FluoQueryColumns.JOIN_RIGHT_CHILD_NODE_ID);
// Return an object holding them.
@@ -421,12 +423,14 @@ public class FluoQueryMetadataDAO {
final String parentNodeId = values.get(FluoQueryColumns.JOIN_PARENT_NODE_ID);
final String leftChildNodeId = values.get(FluoQueryColumns.JOIN_LEFT_CHILD_NODE_ID);
+ final int joinBatchSize = Integer.parseInt(values.get(FluoQueryColumns.JOIN_BATCH_SIZE));
final String rightChildNodeId = values.get(FluoQueryColumns.JOIN_RIGHT_CHILD_NODE_ID);
return JoinMetadata.builder(nodeId)
.setVarOrder(varOrder)
.setJoinType(joinType)
.setParentNodeId(parentNodeId)
+ .setJoinBatchSize(joinBatchSize)
.setLeftChildNodeId(leftChildNodeId)
.setRightChildNodeId(rightChildNodeId);
}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/ad60aca8/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/JoinMetadata.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/JoinMetadata.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/JoinMetadata.java
index aa79daf..d6c488b 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/JoinMetadata.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/JoinMetadata.java
@@ -29,6 +29,7 @@ import org.apache.commons.lang3.builder.EqualsBuilder;
import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
import com.google.common.base.Objects;
+import com.google.common.base.Preconditions;
/**
* Metadata that is specific to Join nodes.
@@ -49,6 +50,9 @@ public class JoinMetadata extends CommonNodeMetadata {
private final String parentNodeId;
private final String leftChildNodeId;
private final String rightChildNodeId;
+ private int joinBatchSize;
+
+ public static final int DEFAULT_JOIN_BATCH_SIZE = 5000;
/**
* Constructs an instance of {@link JoinMetadata}.
@@ -59,6 +63,7 @@ public class JoinMetadata extends CommonNodeMetadata {
* @param parentNodeId - The node id of this node's parent. (not null)
* @param leftChildNodeId - One of the nodes whose results are being joined. (not null)
* @param rightChildNodeId - The other node whose results are being joined. (not null)
+ * @param joinBatchSize - Batch size used to process large joins
*/
public JoinMetadata(
final String nodeId,
@@ -66,12 +71,15 @@ public class JoinMetadata extends CommonNodeMetadata {
final JoinType joinType,
final String parentNodeId,
final String leftChildNodeId,
- final String rightChildNodeId) {
+ final String rightChildNodeId,
+ final int joinBatchSize) {
super(nodeId, varOrder);
this.joinType = checkNotNull(joinType);
this.parentNodeId = checkNotNull(parentNodeId);
this.leftChildNodeId = checkNotNull(leftChildNodeId);
this.rightChildNodeId = checkNotNull(rightChildNodeId);
+ Preconditions.checkArgument(joinBatchSize > 0);
+ this.joinBatchSize = joinBatchSize;
}
/**
@@ -101,6 +109,13 @@ public class JoinMetadata extends CommonNodeMetadata {
public String getRightChildNodeId() {
return rightChildNodeId;
}
+
+ /**
+ * @return - Batch size used to process large joins
+ */
+ public int getJoinBatchSize() {
+ return joinBatchSize;
+ }
@Override
public int hashCode() {
@@ -110,6 +125,7 @@ public class JoinMetadata extends CommonNodeMetadata {
joinType,
parentNodeId,
leftChildNodeId,
+ joinBatchSize,
rightChildNodeId);
}
@@ -127,6 +143,7 @@ public class JoinMetadata extends CommonNodeMetadata {
.append(parentNodeId, joinMetadata.parentNodeId)
.append(leftChildNodeId, joinMetadata.leftChildNodeId)
.append(rightChildNodeId, joinMetadata.rightChildNodeId)
+ .append(joinBatchSize, joinMetadata.joinBatchSize)
.isEquals();
}
return false;
@@ -145,6 +162,7 @@ public class JoinMetadata extends CommonNodeMetadata {
.append(" Parent Node ID: " + parentNodeId + "\n")
.append(" Left Child Node ID: " + leftChildNodeId + "\n")
.append(" Right Child Node ID: " + rightChildNodeId + "\n")
+ .append(" Join Batch Size: " + joinBatchSize + "\n")
.append("}")
.toString();
}
@@ -171,6 +189,7 @@ public class JoinMetadata extends CommonNodeMetadata {
private String parentNodeId;
private String leftChildNodeId;
private String rightChildNodeId;
+ private int joinBatchSize = DEFAULT_JOIN_BATCH_SIZE;
/**
* Constructs an instance of {@link Builder}.
@@ -248,6 +267,16 @@ public class JoinMetadata extends CommonNodeMetadata {
return this;
}
+ /**
+ * Sets the batch size used to process large joins.
+ * @param joinBatchSize - batch size used to process large joins
+ * @return This builder so that method invocation could be chained.
+ */
+ public Builder setJoinBatchSize(int joinBatchSize) {
+ this.joinBatchSize = joinBatchSize;
+ return this;
+ }
+
public String getLeftChildNodeId() {
return leftChildNodeId;
}
@@ -266,7 +295,8 @@ public class JoinMetadata extends CommonNodeMetadata {
joinType,
parentNodeId,
leftChildNodeId,
- rightChildNodeId);
+ rightChildNodeId,
+ joinBatchSize);
}
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/ad60aca8/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/QueryMetadata.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/QueryMetadata.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/QueryMetadata.java
index fe130fb..e46b405 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/QueryMetadata.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/QueryMetadata.java
@@ -20,6 +20,7 @@ package org.apache.rya.indexing.pcj.fluo.app.query;
import static com.google.common.base.Preconditions.checkNotNull;
+import java.util.Optional;
import java.util.Set;
import org.apache.commons.lang3.builder.EqualsBuilder;
@@ -28,6 +29,7 @@ import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.QueryType
import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
import com.google.common.base.Objects;
+import com.google.common.base.Preconditions;
import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
import edu.umd.cs.findbugs.annotations.NonNull;
@@ -48,6 +50,7 @@ public class QueryMetadata extends CommonNodeMetadata {
private final Set<ExportStrategy> exportStrategy;
private final QueryType queryType;
private final String exportId;
+
/**
* Constructs an instance of {@link QueryMetadata}.
@@ -109,6 +112,7 @@ public class QueryMetadata extends CommonNodeMetadata {
return queryType;
}
+
@Override
public int hashCode() {
return Objects.hashCode(
@@ -178,6 +182,8 @@ public class QueryMetadata extends CommonNodeMetadata {
private String childNodeId;
private Set<ExportStrategy> exportStrategies;
private QueryType queryType;
+ private Optional<Integer> joinBatchSize = Optional.empty();
+
/**
* Constructs an instance of {@link Builder}.
@@ -267,6 +273,22 @@ public class QueryMetadata extends CommonNodeMetadata {
public String getChildNodeId() {
return childNodeId;
}
+
+ /**
+ * Sets batch size used to process joins for this query
+ * @param joinBatchSize - batch size used to process joins
+ */
+ public Builder setJoinBatchSize(Optional<Integer> joinBatchSize) {
+ this.joinBatchSize = joinBatchSize;
+ return this;
+ }
+
+ /**
+ * @return Optional containing the batch size used to process large joins
+ */
+ public Optional<Integer> getJoinBatchSize() {
+ return joinBatchSize;
+ }
/**
* @return An instance of {@link QueryMetadata} build using this builder's values.
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/ad60aca8/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/SparqlFluoQueryBuilder.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/SparqlFluoQueryBuilder.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/SparqlFluoQueryBuilder.java
index 6c03be1..7bf6f45 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/SparqlFluoQueryBuilder.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/SparqlFluoQueryBuilder.java
@@ -102,6 +102,8 @@ public class SparqlFluoQueryBuilder {
private TupleExpr te;
private String queryId;
private NodeIds nodeIds;
+ private Optional<Integer> joinBatchSize = Optional.empty();
+
//Default behavior is to export to Kafka - subject to change when user can
//specify their own export strategy
private Set<ExportStrategy> exportStrategies = new HashSet<>(Arrays.asList(ExportStrategy.Kafka));
@@ -137,6 +139,12 @@ public class SparqlFluoQueryBuilder {
return this;
}
+ public SparqlFluoQueryBuilder setJoinBatchSize(int joinBatchSize) {
+ Preconditions.checkArgument(joinBatchSize > 0);
+ this.joinBatchSize = Optional.of(joinBatchSize);
+ return this;
+ }
+
public FluoQuery build() {
Preconditions.checkNotNull(sparql);
Preconditions.checkNotNull(queryId);
@@ -167,6 +175,7 @@ public class SparqlFluoQueryBuilder {
queryBuilder.setSparql(sparql);
queryBuilder.setChildNodeId(childNodeId);
queryBuilder.setExportStrategies(exportStrategies);
+ queryBuilder.setJoinBatchSize(joinBatchSize);
fluoQueryBuilder.setQueryMetadata(queryBuilder);
setChildMetadata(fluoQueryBuilder, childNodeId, queryBuilder.getVariableOrder(), queryId);
@@ -427,6 +436,9 @@ public class SparqlFluoQueryBuilder {
joinBuilder.setJoinType(joinType);
joinBuilder.setLeftChildNodeId( leftChildNodeId );
joinBuilder.setRightChildNodeId( rightChildNodeId );
+ if(fluoQueryBuilder.getQueryBuilder().getJoinBatchSize().isPresent()) {
+ joinBuilder.setJoinBatchSize(fluoQueryBuilder.getQueryBuilder().getJoinBatchSize().get());
+ }
// Figure out the variable order for each child node's binding set and
// store it. Also store that each child node's parent is this join.
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/ad60aca8/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/FluoQueryUtils.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/FluoQueryUtils.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/FluoQueryUtils.java
index 303f9bb..ac41160 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/FluoQueryUtils.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/FluoQueryUtils.java
@@ -59,4 +59,16 @@ public class FluoQueryUtils {
return queryIdParts[1];
}
+ /**
+ * Uses a {@link NodeIdCollector} visitor to do a pre-order traverse of the
+ * FluoQuery and gather the nodeIds of the metadata nodes.
+ * @param query - FluoQuery to be traversed
+ * @return - List of nodeIds, ordered according to the pre-order traversal of the FluoQuery
+ */
+ public static List<String> collectNodeIds(FluoQuery query) {
+ NodeIdCollector collector = new NodeIdCollector(query);
+ collector.visit();
+ return collector.getNodeIds();
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/ad60aca8/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/NodeIdCollector.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/NodeIdCollector.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/NodeIdCollector.java
new file mode 100644
index 0000000..6d374d8
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/NodeIdCollector.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.util;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.rya.indexing.pcj.fluo.app.query.AggregationMetadata;
+import org.apache.rya.indexing.pcj.fluo.app.query.ConstructQueryMetadata;
+import org.apache.rya.indexing.pcj.fluo.app.query.FilterMetadata;
+import org.apache.rya.indexing.pcj.fluo.app.query.FluoQuery;
+import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata;
+import org.apache.rya.indexing.pcj.fluo.app.query.PeriodicQueryMetadata;
+import org.apache.rya.indexing.pcj.fluo.app.query.ProjectionMetadata;
+import org.apache.rya.indexing.pcj.fluo.app.query.QueryMetadata;
+import org.apache.rya.indexing.pcj.fluo.app.query.QueryMetadataVisitorBase;
+import org.apache.rya.indexing.pcj.fluo.app.query.StatementPatternMetadata;
+
+/**
+ * A visitor that does a pre-order traversal of the FluoQuery and
+ * collects the ids of metadata query nodes along the way.
+ *
+ */
+public class NodeIdCollector extends QueryMetadataVisitorBase {
+
+ List<String> ids;
+
+ public NodeIdCollector(FluoQuery fluoQuery ) {
+ super(fluoQuery);
+ ids = new ArrayList<>();
+ }
+
+ public List<String> getNodeIds() {
+ return ids;
+ }
+
+ public void visit(QueryMetadata metadata) {
+ ids.add(metadata.getNodeId());
+ super.visit(metadata);
+ }
+
+ public void visit(ProjectionMetadata metadata) {
+ ids.add(metadata.getNodeId());
+ super.visit(metadata);
+ }
+
+ public void visit(ConstructQueryMetadata metadata) {
+ ids.add(metadata.getNodeId());
+ super.visit(metadata);
+ }
+
+ public void visit(FilterMetadata metadata) {
+ ids.add(metadata.getNodeId());
+ super.visit(metadata);
+ }
+
+ public void visit(JoinMetadata metadata) {
+ ids.add(metadata.getNodeId());
+ super.visit(metadata);
+ }
+
+ public void visit(StatementPatternMetadata metadata) {
+ ids.add(metadata.getNodeId());
+ }
+
+ public void visit(PeriodicQueryMetadata metadata) {
+ ids.add(metadata.getNodeId());
+ super.visit(metadata);
+ }
+
+ public void visit(AggregationMetadata metadata) {
+ ids.add(metadata.getNodeId());
+ super.visit(metadata);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/ad60aca8/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/batch/serializer/BatchInformationSerializerTest.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/batch/serializer/BatchInformationSerializerTest.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/batch/serializer/BatchInformationSerializerTest.java
index fe89325..210aa0c 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/batch/serializer/BatchInformationSerializerTest.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/batch/serializer/BatchInformationSerializerTest.java
@@ -20,7 +20,6 @@ package org.apache.rya.indexing.pcj.fluo.app.batch.serializer;
import static org.junit.Assert.assertEquals;
-import java.util.Arrays;
import java.util.Optional;
import org.apache.fluo.api.data.Bytes;
@@ -32,7 +31,6 @@ import org.apache.rya.indexing.pcj.fluo.app.batch.JoinBatchInformation;
import org.apache.rya.indexing.pcj.fluo.app.batch.SpanBatchDeleteInformation;
import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata.JoinType;
-import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
import org.junit.Test;
import org.openrdf.model.impl.URIImpl;
@@ -62,8 +60,7 @@ public class BatchInformationSerializerTest {
JoinBatchInformation batch = JoinBatchInformation.builder().setBatchSize(1000).setTask(Task.Update)
.setColumn(FluoQueryColumns.PERIODIC_QUERY_BINDING_SET).setSpan(Span.prefix(Bytes.of("prefix346")))
- .setJoinType(JoinType.LEFT_OUTER_JOIN).setSide(Side.RIGHT).setVarOrder(new VariableOrder(Arrays.asList("a", "b")))
- .setBs(vBis).build();
+ .setJoinType(JoinType.LEFT_OUTER_JOIN).setSide(Side.RIGHT).setBs(vBis).build();
byte[] batchBytes = BatchInformationSerializer.toBytes(batch);
Optional<BatchInformation> decodedBatch = BatchInformationSerializer.fromBytes(batchBytes);
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/ad60aca8/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/QueryBuilderVisitorTest.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/QueryBuilderVisitorTest.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/QueryBuilderVisitorTest.java
index b432868..64504ca 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/QueryBuilderVisitorTest.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/QueryBuilderVisitorTest.java
@@ -79,25 +79,21 @@ public class QueryBuilderVisitorTest {
}
public void visit(QueryMetadata.Builder queryBuilder) {
- System.out.println(queryBuilder.getNodeId());
ids.add(queryBuilder.getNodeId());
super.visit(queryBuilder);
}
public void visit(ProjectionMetadata.Builder projectionBuilder) {
- System.out.println(projectionBuilder.getNodeId());
ids.add(projectionBuilder.getNodeId());
super.visit(projectionBuilder);
}
public void visit(JoinMetadata.Builder joinBuilder) {
- System.out.println(joinBuilder.getNodeId());
ids.add(joinBuilder.getNodeId());
super.visit(joinBuilder);
}
public void visit(StatementPatternMetadata.Builder statementBuilder) {
- System.out.println(statementBuilder.getNodeId());
ids.add(statementBuilder.getNodeId());
}
}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/ad60aca8/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/BatchDeleteIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/BatchDeleteIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/BatchDeleteIT.java
deleted file mode 100644
index 1707308..0000000
--- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/BatchDeleteIT.java
+++ /dev/null
@@ -1,316 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.rya.indexing.pcj.fluo.integration;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Set;
-
-import org.apache.fluo.api.client.FluoClient;
-import org.apache.fluo.api.client.Snapshot;
-import org.apache.fluo.api.client.Transaction;
-import org.apache.fluo.api.client.scanner.ColumnScanner;
-import org.apache.fluo.api.client.scanner.RowScanner;
-import org.apache.fluo.api.data.Bytes;
-import org.apache.fluo.api.data.Column;
-import org.apache.fluo.api.data.ColumnValue;
-import org.apache.fluo.api.data.Span;
-import org.apache.fluo.core.client.FluoClientImpl;
-import org.apache.log4j.Logger;
-import org.apache.rya.api.domain.RyaStatement;
-import org.apache.rya.api.domain.RyaURI;
-import org.apache.rya.indexing.pcj.fluo.api.CreateFluoPcj;
-import org.apache.rya.indexing.pcj.fluo.api.InsertTriples;
-import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants;
-import org.apache.rya.indexing.pcj.fluo.app.JoinResultUpdater.Side;
-import org.apache.rya.indexing.pcj.fluo.app.NodeType;
-import org.apache.rya.indexing.pcj.fluo.app.batch.BatchInformation;
-import org.apache.rya.indexing.pcj.fluo.app.batch.BatchInformation.Task;
-import org.apache.rya.indexing.pcj.fluo.app.batch.BatchInformationDAO;
-import org.apache.rya.indexing.pcj.fluo.app.batch.JoinBatchInformation;
-import org.apache.rya.indexing.pcj.fluo.app.batch.SpanBatchDeleteInformation;
-import org.apache.rya.indexing.pcj.fluo.app.query.FluoQuery;
-import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
-import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO;
-import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata;
-import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata.JoinType;
-import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
-import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage;
-import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
-import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
-import org.apache.rya.pcj.fluo.test.base.RyaExportITBase;
-import org.junit.Test;
-import org.openrdf.model.impl.URIImpl;
-import org.openrdf.query.algebra.evaluation.QueryBindingSet;
-
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-
-public class BatchDeleteIT extends RyaExportITBase {
-
- private static final Logger log = Logger.getLogger(BatchDeleteIT.class);
- private static final FluoQueryMetadataDAO dao = new FluoQueryMetadataDAO();
-
- @Test
- public void simpleScanDelete() throws Exception {
-
- final String sparql = "SELECT ?subject ?object1 ?object2 WHERE { ?subject <urn:predicate_1> ?object1; "
- + " <urn:predicate_2> ?object2 } ";
- try (FluoClient fluoClient = new FluoClientImpl(getFluoConfiguration())) {
-
- RyaURI subj = new RyaURI("urn:subject_1");
- RyaStatement statement1 = new RyaStatement(subj, new RyaURI("urn:predicate_1"), null);
- RyaStatement statement2 = new RyaStatement(subj, new RyaURI("urn:predicate_2"), null);
- Set<RyaStatement> statements1 = getRyaStatements(statement1, 10);
- Set<RyaStatement> statements2 = getRyaStatements(statement2, 10);
-
- // Create the PCJ table.
- final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(getAccumuloConnector(), getRyaInstanceName());
- final String pcjId = pcjStorage.createPcj(sparql);
-
- // Tell the Fluo app to maintain the PCJ.
- String queryId = new CreateFluoPcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, getAccumuloConnector(), getRyaInstanceName());
-
- List<String> ids = getNodeIdStrings(fluoClient, queryId);
- List<String> prefixes = Arrays.asList("urn:subject_1", "urn:object", "urn:subject_1", "urn:subject_1");
-
- // Stream the data into Fluo.
- InsertTriples inserter = new InsertTriples();
- inserter.insert(fluoClient, statements1, Optional.<String> absent());
- inserter.insert(fluoClient, statements2, Optional.<String> absent());
-
- // Verify the end results of the query match the expected results.
- getMiniFluo().waitForObservers();
-
- verifyCounts(fluoClient, ids, Arrays.asList(100, 100, 10, 10));
-
- createSpanBatches(fluoClient, ids, prefixes, 10);
- getMiniFluo().waitForObservers();
-
- verifyCounts(fluoClient, ids, Arrays.asList(0, 0, 0, 0));
- }
- }
-
- @Test
- public void simpleJoinDelete() throws Exception {
- final String sparql = "SELECT ?subject ?object1 ?object2 WHERE { ?subject <urn:predicate_1> ?object1; "
- + " <urn:predicate_2> ?object2 } ";
- try (FluoClient fluoClient = new FluoClientImpl(getFluoConfiguration())) {
-
- RyaURI subj = new RyaURI("urn:subject_1");
- RyaStatement statement1 = new RyaStatement(subj, new RyaURI("urn:predicate_1"), null);
- RyaStatement statement2 = new RyaStatement(subj, new RyaURI("urn:predicate_2"), null);
- Set<RyaStatement> statements1 = getRyaStatements(statement1, 5);
- Set<RyaStatement> statements2 = getRyaStatements(statement2, 5);
-
- // Create the PCJ table.
- final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(getAccumuloConnector(), getRyaInstanceName());
- final String pcjId = pcjStorage.createPcj(sparql);
-
- // Tell the Fluo app to maintain the PCJ.
- String queryId = new CreateFluoPcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, getAccumuloConnector(), getRyaInstanceName());
-
- List<String> ids = getNodeIdStrings(fluoClient, queryId);
- String joinId = ids.get(1);
- String rightSp = ids.get(3);
- QueryBindingSet bs = new QueryBindingSet();
- bs.addBinding("subject", new URIImpl("urn:subject_1"));
- bs.addBinding("object1", new URIImpl("urn:object_0"));
- VisibilityBindingSet vBs = new VisibilityBindingSet(bs);
- Span span = Span.prefix(Bytes.of(rightSp + IncrementalUpdateConstants.NODEID_BS_DELIM + "urn:subject_1"));
- VariableOrder varOrder = new VariableOrder(Arrays.asList("subject", "object2"));
-
- // Stream the data into Fluo.
- InsertTriples inserter = new InsertTriples();
- inserter.insert(fluoClient, statements1, Optional.<String> absent());
- inserter.insert(fluoClient, statements2, Optional.<String> absent());
-
- getMiniFluo().waitForObservers();
- verifyCounts(fluoClient, ids, Arrays.asList(25, 25, 5, 5));
-
- JoinBatchInformation batch = JoinBatchInformation.builder().setBatchSize(1)
- .setColumn(FluoQueryColumns.STATEMENT_PATTERN_BINDING_SET).setSpan(span).setTask(Task.Delete)
- .setJoinType(JoinType.NATURAL_JOIN).setSide(Side.LEFT).setBs(vBs).setVarOrder(varOrder).build();
- // Verify the end results of the query match the expected results.
- createSpanBatch(fluoClient, joinId, batch);
-
- getMiniFluo().waitForObservers();
- verifyCounts(fluoClient, ids, Arrays.asList(25, 20, 5, 5));
- }
- }
-
- @Test
- public void simpleJoinAdd() throws Exception {
- final String sparql = "SELECT ?subject ?object1 ?object2 WHERE { ?subject <urn:predicate_1> ?object1; "
- + " <urn:predicate_2> ?object2 } ";
- try (FluoClient fluoClient = new FluoClientImpl(getFluoConfiguration())) {
-
- RyaURI subj = new RyaURI("urn:subject_1");
- RyaStatement statement2 = new RyaStatement(subj, new RyaURI("urn:predicate_2"), null);
- Set<RyaStatement> statements2 = getRyaStatements(statement2, 5);
-
- // Create the PCJ table.
- final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(getAccumuloConnector(), getRyaInstanceName());
- final String pcjId = pcjStorage.createPcj(sparql);
-
- // Tell the Fluo app to maintain the PCJ.
- String queryId = new CreateFluoPcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, getAccumuloConnector(), getRyaInstanceName());
-
- List<String> ids = getNodeIdStrings(fluoClient, queryId);
- String joinId = ids.get(1);
- String rightSp = ids.get(3);
- QueryBindingSet bs = new QueryBindingSet();
- bs.addBinding("subject", new URIImpl("urn:subject_1"));
- bs.addBinding("object1", new URIImpl("urn:object_0"));
- VisibilityBindingSet vBs = new VisibilityBindingSet(bs);
- Span span = Span.prefix(Bytes.of(rightSp + IncrementalUpdateConstants.NODEID_BS_DELIM + "urn:subject_1"));
- VariableOrder varOrder = new VariableOrder(Arrays.asList("subject", "object2"));
-
- // Stream the data into Fluo.
- InsertTriples inserter = new InsertTriples();
- inserter.insert(fluoClient, statements2, Optional.<String> absent());
-
- getMiniFluo().waitForObservers();
- verifyCounts(fluoClient, ids, Arrays.asList(0, 0, 0, 5));
-
- JoinBatchInformation batch = JoinBatchInformation.builder().setBatchSize(1)
- .setColumn(FluoQueryColumns.STATEMENT_PATTERN_BINDING_SET).setSpan(span).setTask(Task.Add)
- .setJoinType(JoinType.NATURAL_JOIN).setSide(Side.LEFT).setBs(vBs).setVarOrder(varOrder).build();
- // Verify the end results of the query match the expected results.
- createSpanBatch(fluoClient, joinId, batch);
-
- getMiniFluo().waitForObservers();
- verifyCounts(fluoClient, ids, Arrays.asList(5, 5, 0, 5));
- }
- }
-
- private Set<RyaStatement> getRyaStatements(RyaStatement statement, int numTriples) {
-
- Set<RyaStatement> statements = new HashSet<>();
- final String subject = "urn:subject_";
- final String predicate = "urn:predicate_";
- final String object = "urn:object_";
-
- for (int i = 0; i < numTriples; i++) {
- RyaStatement stmnt = new RyaStatement(statement.getSubject(), statement.getPredicate(), statement.getObject());
- if (stmnt.getSubject() == null) {
- stmnt.setSubject(new RyaURI(subject + i));
- }
- if (stmnt.getPredicate() == null) {
- stmnt.setPredicate(new RyaURI(predicate + i));
- }
- if (stmnt.getObject() == null) {
- stmnt.setObject(new RyaURI(object + i));
- }
- statements.add(stmnt);
- }
- return statements;
- }
-
- private List<String> getNodeIdStrings(FluoClient fluoClient, String queryId) {
- List<String> nodeStrings = new ArrayList<>();
- try (Snapshot sx = fluoClient.newSnapshot()) {
- FluoQuery query = dao.readFluoQuery(sx, queryId);
- nodeStrings.add(queryId);
- Collection<JoinMetadata> jMeta = query.getJoinMetadata();
- for (JoinMetadata meta : jMeta) {
- nodeStrings.add(meta.getNodeId());
- nodeStrings.add(meta.getLeftChildNodeId());
- nodeStrings.add(meta.getRightChildNodeId());
- }
- }
- return nodeStrings;
- }
-
- private void createSpanBatches(FluoClient fluoClient, List<String> ids, List<String> prefixes, int batchSize) {
-
- Preconditions.checkArgument(ids.size() == prefixes.size());
-
- try (Transaction tx = fluoClient.newTransaction()) {
- for (int i = 0; i < ids.size(); i++) {
- String id = ids.get(i);
- String bsPrefix = prefixes.get(i);
- NodeType type = NodeType.fromNodeId(id).get();
- Column bsCol = type.getResultColumn();
- String row = id + IncrementalUpdateConstants.NODEID_BS_DELIM + bsPrefix;
- Span span = Span.prefix(Bytes.of(row));
- BatchInformation batch = SpanBatchDeleteInformation.builder().setBatchSize(batchSize).setColumn(bsCol).setSpan(span)
- .build();
- BatchInformationDAO.addBatch(tx, id, batch);
- }
- tx.commit();
- }
- }
-
- private void createSpanBatch(FluoClient fluoClient, String nodeId, BatchInformation batch) {
- try (Transaction tx = fluoClient.newTransaction()) {
- BatchInformationDAO.addBatch(tx, nodeId, batch);
- tx.commit();
- }
- }
-
- private int countResults(FluoClient fluoClient, String nodeId, Column bsColumn) {
- try (Transaction tx = fluoClient.newTransaction()) {
- int count = 0;
- RowScanner scanner = tx.scanner().over(Span.prefix(nodeId)).fetch(bsColumn).byRow().build();
- Iterator<ColumnScanner> colScanners = scanner.iterator();
- while (colScanners.hasNext()) {
- ColumnScanner colScanner = colScanners.next();
- Iterator<ColumnValue> vals = colScanner.iterator();
- while (vals.hasNext()) {
- vals.next();
- count++;
- }
- }
- tx.commit();
- return count;
- }
- }
-
- private void verifyCounts(FluoClient fluoClient, List<String> ids, List<Integer> expectedCounts) {
- Preconditions.checkArgument(ids.size() == expectedCounts.size());
- for (int i = 0; i < ids.size(); i++) {
- String id = ids.get(i);
- int expected = expectedCounts.get(i);
- NodeType type = NodeType.fromNodeId(id).get();
- int count = countResults(fluoClient, id, type.getResultColumn());
- log.trace("NodeId: " + id + " Count: " + count + " Expected: " + expected);
- switch (type) {
- case STATEMENT_PATTERN:
- assertEquals(expected, count);
- break;
- case JOIN:
- assertEquals(expected, count);
- break;
- case QUERY:
- assertEquals(expected, count);
- break;
- default:
- break;
- }
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/ad60aca8/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/BatchIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/BatchIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/BatchIT.java
new file mode 100644
index 0000000..32d0e41
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/BatchIT.java
@@ -0,0 +1,424 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.integration;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.fluo.api.client.FluoClient;
+import org.apache.fluo.api.client.Snapshot;
+import org.apache.fluo.api.client.Transaction;
+import org.apache.fluo.api.client.scanner.ColumnScanner;
+import org.apache.fluo.api.client.scanner.RowScanner;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.data.ColumnValue;
+import org.apache.fluo.api.data.Span;
+import org.apache.fluo.core.client.FluoClientImpl;
+import org.apache.log4j.Logger;
+import org.apache.rya.api.domain.RyaStatement;
+import org.apache.rya.api.domain.RyaURI;
+import org.apache.rya.indexing.pcj.fluo.api.CreateFluoPcj;
+import org.apache.rya.indexing.pcj.fluo.api.InsertTriples;
+import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants;
+import org.apache.rya.indexing.pcj.fluo.app.JoinResultUpdater.Side;
+import org.apache.rya.indexing.pcj.fluo.app.NodeType;
+import org.apache.rya.indexing.pcj.fluo.app.batch.BatchInformation;
+import org.apache.rya.indexing.pcj.fluo.app.batch.BatchInformation.Task;
+import org.apache.rya.indexing.pcj.fluo.app.batch.BatchInformationDAO;
+import org.apache.rya.indexing.pcj.fluo.app.batch.JoinBatchInformation;
+import org.apache.rya.indexing.pcj.fluo.app.batch.SpanBatchDeleteInformation;
+import org.apache.rya.indexing.pcj.fluo.app.query.FluoQuery;
+import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
+import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO;
+import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata.JoinType;
+import org.apache.rya.indexing.pcj.fluo.app.util.FluoQueryUtils;
+import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
+import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage;
+import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
+import org.apache.rya.pcj.fluo.test.base.RyaExportITBase;
+import org.junit.Test;
+import org.openrdf.model.impl.URIImpl;
+import org.openrdf.query.algebra.evaluation.QueryBindingSet;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+
+public class BatchIT extends RyaExportITBase {
+
+ private static final Logger log = Logger.getLogger(BatchIT.class);
+ private static final FluoQueryMetadataDAO dao = new FluoQueryMetadataDAO();
+
+ @Test
+ public void simpleScanDelete() throws Exception {
+
+ final String sparql = "SELECT ?subject ?object1 ?object2 WHERE { ?subject <urn:predicate_1> ?object1; "
+ + " <urn:predicate_2> ?object2 } ";
+ try (FluoClient fluoClient = new FluoClientImpl(getFluoConfiguration())) {
+
+ RyaURI subj = new RyaURI("urn:subject_1");
+ RyaStatement statement1 = new RyaStatement(subj, new RyaURI("urn:predicate_1"), null);
+ RyaStatement statement2 = new RyaStatement(subj, new RyaURI("urn:predicate_2"), null);
+ Set<RyaStatement> statements1 = getRyaStatements(statement1, 10);
+ Set<RyaStatement> statements2 = getRyaStatements(statement2, 10);
+
+ // Create the PCJ table.
+ final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(getAccumuloConnector(), getRyaInstanceName());
+ final String pcjId = pcjStorage.createPcj(sparql);
+
+ // Tell the Fluo app to maintain the PCJ.
+ String queryId = new CreateFluoPcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, getAccumuloConnector(),
+ getRyaInstanceName());
+
+ List<String> ids = getNodeIdStrings(fluoClient, queryId);
+ List<String> prefixes = Arrays.asList("urn:subject_1", "urn:subject_1", "urn:object", "urn:subject_1", "urn:subject_1");
+
+ // Stream the data into Fluo.
+ InsertTriples inserter = new InsertTriples();
+ inserter.insert(fluoClient, statements1, Optional.<String> absent());
+ inserter.insert(fluoClient, statements2, Optional.<String> absent());
+
+ // Verify the end results of the query match the expected results.
+ getMiniFluo().waitForObservers();
+
+ verifyCounts(fluoClient, ids, Arrays.asList(100, 100, 100, 10, 10));
+
+ createSpanBatches(fluoClient, ids, prefixes, 10);
+ getMiniFluo().waitForObservers();
+
+ verifyCounts(fluoClient, ids, Arrays.asList(0, 0, 0, 0, 0));
+ }
+ }
+
+ @Test
+ public void simpleJoinDelete() throws Exception {
+ final String sparql = "SELECT ?subject ?object1 ?object2 WHERE { ?subject <urn:predicate_1> ?object1; "
+ + " <urn:predicate_2> ?object2 } ";
+ try (FluoClient fluoClient = new FluoClientImpl(getFluoConfiguration())) {
+
+ RyaURI subj = new RyaURI("urn:subject_1");
+ RyaStatement statement1 = new RyaStatement(subj, new RyaURI("urn:predicate_1"), null);
+ RyaStatement statement2 = new RyaStatement(subj, new RyaURI("urn:predicate_2"), null);
+ Set<RyaStatement> statements1 = getRyaStatements(statement1, 5);
+ Set<RyaStatement> statements2 = getRyaStatements(statement2, 5);
+
+ // Create the PCJ table.
+ final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(getAccumuloConnector(), getRyaInstanceName());
+ final String pcjId = pcjStorage.createPcj(sparql);
+
+ // Tell the Fluo app to maintain the PCJ.
+ String queryId = new CreateFluoPcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, getAccumuloConnector(),
+ getRyaInstanceName());
+
+ List<String> ids = getNodeIdStrings(fluoClient, queryId);
+ String joinId = ids.get(2);
+ String rightSp = ids.get(4);
+ QueryBindingSet bs = new QueryBindingSet();
+ bs.addBinding("subject", new URIImpl("urn:subject_1"));
+ bs.addBinding("object1", new URIImpl("urn:object_0"));
+ VisibilityBindingSet vBs = new VisibilityBindingSet(bs);
+ Span span = Span.prefix(Bytes.of(rightSp + IncrementalUpdateConstants.NODEID_BS_DELIM + "urn:subject_1"));
+
+ // Stream the data into Fluo.
+ InsertTriples inserter = new InsertTriples();
+ inserter.insert(fluoClient, statements1, Optional.<String> absent());
+ inserter.insert(fluoClient, statements2, Optional.<String> absent());
+
+ getMiniFluo().waitForObservers();
+ verifyCounts(fluoClient, ids, Arrays.asList(25, 25, 25, 5, 5));
+
+ JoinBatchInformation batch = JoinBatchInformation.builder().setBatchSize(1)
+ .setColumn(FluoQueryColumns.STATEMENT_PATTERN_BINDING_SET).setSpan(span).setTask(Task.Delete)
+ .setJoinType(JoinType.NATURAL_JOIN).setSide(Side.LEFT).setBs(vBs).build();
+ // Verify the end results of the query match the expected results.
+ createSpanBatch(fluoClient, joinId, batch);
+
+ getMiniFluo().waitForObservers();
+ verifyCounts(fluoClient, ids, Arrays.asList(25, 25, 20, 5, 5));
+ }
+ }
+
+ @Test
+ public void simpleJoinAdd() throws Exception {
+ final String sparql = "SELECT ?subject ?object1 ?object2 WHERE { ?subject <urn:predicate_1> ?object1; "
+ + " <urn:predicate_2> ?object2 } ";
+ try (FluoClient fluoClient = new FluoClientImpl(getFluoConfiguration())) {
+
+ RyaURI subj = new RyaURI("urn:subject_1");
+ RyaStatement statement2 = new RyaStatement(subj, new RyaURI("urn:predicate_2"), null);
+ Set<RyaStatement> statements2 = getRyaStatements(statement2, 5);
+
+ // Create the PCJ table.
+ final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(getAccumuloConnector(), getRyaInstanceName());
+ final String pcjId = pcjStorage.createPcj(sparql);
+
+ // Tell the Fluo app to maintain the PCJ.
+ String queryId = new CreateFluoPcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, getAccumuloConnector(),
+ getRyaInstanceName());
+
+ List<String> ids = getNodeIdStrings(fluoClient, queryId);
+ String joinId = ids.get(2);
+ String rightSp = ids.get(4);
+ QueryBindingSet bs = new QueryBindingSet();
+ bs.addBinding("subject", new URIImpl("urn:subject_1"));
+ bs.addBinding("object1", new URIImpl("urn:object_0"));
+ VisibilityBindingSet vBs = new VisibilityBindingSet(bs);
+ Span span = Span.prefix(Bytes.of(rightSp + IncrementalUpdateConstants.NODEID_BS_DELIM + "urn:subject_1"));
+
+ // Stream the data into Fluo.
+ InsertTriples inserter = new InsertTriples();
+ inserter.insert(fluoClient, statements2, Optional.<String> absent());
+
+ getMiniFluo().waitForObservers();
+ verifyCounts(fluoClient, ids, Arrays.asList(0, 0, 0, 0, 5));
+
+ JoinBatchInformation batch = JoinBatchInformation.builder().setBatchSize(1)
+ .setColumn(FluoQueryColumns.STATEMENT_PATTERN_BINDING_SET).setSpan(span).setTask(Task.Add)
+ .setJoinType(JoinType.NATURAL_JOIN).setSide(Side.LEFT).setBs(vBs).build();
+ // Verify the end results of the query match the expected results.
+ createSpanBatch(fluoClient, joinId, batch);
+
+ getMiniFluo().waitForObservers();
+ verifyCounts(fluoClient, ids, Arrays.asList(5, 5, 5, 0, 5));
+ }
+ }
+
+ @Test
+ public void joinBatchIntegrationTest() throws Exception {
+ final String sparql = "SELECT ?subject ?object1 ?object2 WHERE { ?subject <urn:predicate_1> ?object1; "
+ + " <urn:predicate_2> ?object2 } ";
+ try (FluoClient fluoClient = new FluoClientImpl(getFluoConfiguration())) {
+
+ RyaURI subj = new RyaURI("urn:subject_1");
+ RyaStatement statement1 = new RyaStatement(subj, new RyaURI("urn:predicate_1"), null);
+ RyaStatement statement2 = new RyaStatement(subj, new RyaURI("urn:predicate_2"), null);
+
+ Set<RyaStatement> statements1 = getRyaStatements(statement1, 15);
+ Set<RyaStatement> statements2 = getRyaStatements(statement2, 15);
+
+ // Create the PCJ table.
+ final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(getAccumuloConnector(), getRyaInstanceName());
+ final String pcjId = pcjStorage.createPcj(sparql);
+
+ // Tell the Fluo app to maintain the PCJ and sets batch scan size for StatementPatterns to 5 and
+ // batch size of joins to 5.
+ String queryId = new CreateFluoPcj(5, 5).withRyaIntegration(pcjId, pcjStorage, fluoClient, getAccumuloConnector(),
+ getRyaInstanceName());
+
+ List<String> ids = getNodeIdStrings(fluoClient, queryId);
+
+ // Stream the data into Fluo.
+ InsertTriples inserter = new InsertTriples();
+ inserter.insert(fluoClient, statements1, Optional.<String> absent());
+ inserter.insert(fluoClient, statements2, Optional.<String> absent());
+
+ getMiniFluo().waitForObservers();
+ verifyCounts(fluoClient, ids, Arrays.asList(225, 225, 225, 15, 15));
+ }
+ }
+
+
+ @Test
+ public void leftJoinBatchIntegrationTest() throws Exception {
+ final String sparql = "SELECT ?subject ?object1 ?object2 WHERE { ?subject <urn:predicate_1> ?object1; "
+ + "OPTIONAL{ ?subject <urn:predicate_2> ?object2} } ";
+ try (FluoClient fluoClient = new FluoClientImpl(getFluoConfiguration())) {
+
+ RyaURI subj = new RyaURI("urn:subject_1");
+ RyaStatement statement1 = new RyaStatement(subj, new RyaURI("urn:predicate_1"), null);
+ RyaStatement statement2 = new RyaStatement(subj, new RyaURI("urn:predicate_2"), null);
+
+ subj = new RyaURI("urn:subject_2");
+ RyaStatement statement3 = new RyaStatement(subj, new RyaURI("urn:predicate_1"), null);
+
+ Set<RyaStatement> statements1 = getRyaStatements(statement1, 10);
+ Set<RyaStatement> statements2 = getRyaStatements(statement2, 10);
+ Set<RyaStatement> statements3 = getRyaStatements(statement3, 10);
+
+ // Create the PCJ table.
+ final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(getAccumuloConnector(), getRyaInstanceName());
+ final String pcjId = pcjStorage.createPcj(sparql);
+
+ // Tell the Fluo app to maintain the PCJ and sets batch scan size for StatementPatterns to 5 and
+ // batch size of joins to 5.
+ String queryId = new CreateFluoPcj(5, 5).withRyaIntegration(pcjId, pcjStorage, fluoClient, getAccumuloConnector(),
+ getRyaInstanceName());
+
+ List<String> ids = getNodeIdStrings(fluoClient, queryId);
+
+ // Stream the data into Fluo.
+ InsertTriples inserter = new InsertTriples();
+ inserter.insert(fluoClient, statements1, Optional.<String> absent());
+ inserter.insert(fluoClient, statements2, Optional.<String> absent());
+ inserter.insert(fluoClient, statements3, Optional.<String> absent());
+
+ getMiniFluo().waitForObservers();
+ verifyCounts(fluoClient, ids, Arrays.asList(110, 110, 110, 20, 10));
+ }
+ }
+
+
+ @Test
+ public void multiJoinBatchIntegrationTest() throws Exception {
+ final String sparql = "SELECT ?subject1 ?subject2 ?object1 ?object2 WHERE { ?subject1 <urn:predicate_1> ?object1; "
+ + " <urn:predicate_2> ?object2 ."
+ + " ?subject2 <urn:predicate_3> ?object2 } ";
+ try (FluoClient fluoClient = new FluoClientImpl(getFluoConfiguration())) {
+
+ RyaURI subj1 = new RyaURI("urn:subject_1");
+ RyaStatement statement1 = new RyaStatement(subj1, new RyaURI("urn:predicate_1"), null);
+ RyaStatement statement2 = new RyaStatement(subj1, new RyaURI("urn:predicate_2"), null);
+
+ Set<RyaStatement> statements1 = getRyaStatements(statement1, 10);
+ Set<RyaStatement> statements2 = getRyaStatements(statement2, 10);
+
+ RyaURI subj2 = new RyaURI("urn:subject_2");
+ RyaStatement statement3 = new RyaStatement(subj2, new RyaURI("urn:predicate_3"), null);
+ Set<RyaStatement> statements3 = getRyaStatements(statement3, 10);
+
+ // Create the PCJ table.
+ final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(getAccumuloConnector(), getRyaInstanceName());
+ final String pcjId = pcjStorage.createPcj(sparql);
+
+ // Tell the Fluo app to maintain the PCJ and sets batch scan size for StatementPatterns to 5 and
+ // batch size of joins to 5.
+ String queryId = new CreateFluoPcj(5, 5).withRyaIntegration(pcjId, pcjStorage, fluoClient, getAccumuloConnector(),
+ getRyaInstanceName());
+
+ List<String> ids = getNodeIdStrings(fluoClient, queryId);
+
+ // Stream the data into Fluo.
+ InsertTriples inserter = new InsertTriples();
+ inserter.insert(fluoClient, statements1, Optional.<String> absent());
+ inserter.insert(fluoClient, statements2, Optional.<String> absent());
+ inserter.insert(fluoClient, statements3, Optional.<String> absent());
+
+ getMiniFluo().waitForObservers();
+ verifyCounts(fluoClient, ids, Arrays.asList(100, 100, 100, 100, 10, 10, 10));
+ }
+ }
+
+
+ private Set<RyaStatement> getRyaStatements(RyaStatement statement, int numTriples) {
+
+ Set<RyaStatement> statements = new HashSet<>();
+ final String subject = "urn:subject_";
+ final String predicate = "urn:predicate_";
+ final String object = "urn:object_";
+
+ for (int i = 0; i < numTriples; i++) {
+ RyaStatement stmnt = new RyaStatement(statement.getSubject(), statement.getPredicate(), statement.getObject());
+ if (stmnt.getSubject() == null) {
+ stmnt.setSubject(new RyaURI(subject + i));
+ }
+ if (stmnt.getPredicate() == null) {
+ stmnt.setPredicate(new RyaURI(predicate + i));
+ }
+ if (stmnt.getObject() == null) {
+ stmnt.setObject(new RyaURI(object + i));
+ }
+ statements.add(stmnt);
+ }
+ return statements;
+ }
+
+ private List<String> getNodeIdStrings(FluoClient fluoClient, String queryId) {
+ List<String> nodeStrings;
+ try (Snapshot sx = fluoClient.newSnapshot()) {
+ FluoQuery query = dao.readFluoQuery(sx, queryId);
+ nodeStrings = FluoQueryUtils.collectNodeIds(query);
+ }
+ return nodeStrings;
+ }
+
+ private void createSpanBatches(FluoClient fluoClient, List<String> ids, List<String> prefixes, int batchSize) {
+
+ Preconditions.checkArgument(ids.size() == prefixes.size());
+
+ try (Transaction tx = fluoClient.newTransaction()) {
+ for (int i = 0; i < ids.size(); i++) {
+ String id = ids.get(i);
+ String bsPrefix = prefixes.get(i);
+ NodeType type = NodeType.fromNodeId(id).get();
+ Column bsCol = type.getResultColumn();
+ String row = id + IncrementalUpdateConstants.NODEID_BS_DELIM + bsPrefix;
+ Span span = Span.prefix(Bytes.of(row));
+ BatchInformation batch = SpanBatchDeleteInformation.builder().setBatchSize(batchSize).setColumn(bsCol).setSpan(span)
+ .build();
+ BatchInformationDAO.addBatch(tx, id, batch);
+ }
+ tx.commit();
+ }
+ }
+
+ private void createSpanBatch(FluoClient fluoClient, String nodeId, BatchInformation batch) {
+ try (Transaction tx = fluoClient.newTransaction()) {
+ BatchInformationDAO.addBatch(tx, nodeId, batch);
+ tx.commit();
+ }
+ }
+
+ private int countResults(FluoClient fluoClient, String nodeId, Column bsColumn) {
+ try (Transaction tx = fluoClient.newTransaction()) {
+ int count = 0;
+ RowScanner scanner = tx.scanner().over(Span.prefix(nodeId)).fetch(bsColumn).byRow().build();
+ Iterator<ColumnScanner> colScanners = scanner.iterator();
+ while (colScanners.hasNext()) {
+ ColumnScanner colScanner = colScanners.next();
+ Iterator<ColumnValue> vals = colScanner.iterator();
+ while (vals.hasNext()) {
+ vals.next();
+ count++;
+ }
+ }
+ tx.commit();
+ return count;
+ }
+ }
+
+ private void verifyCounts(FluoClient fluoClient, List<String> ids, List<Integer> expectedCounts) {
+ Preconditions.checkArgument(ids.size() == expectedCounts.size());
+ for (int i = 0; i < ids.size(); i++) {
+ String id = ids.get(i);
+ int expected = expectedCounts.get(i);
+ NodeType type = NodeType.fromNodeId(id).get();
+ int count = countResults(fluoClient, id, type.getResultColumn());
+ log.trace("NodeId: " + id + " Count: " + count + " Expected: " + expected);
+ switch (type) {
+ case STATEMENT_PATTERN:
+ assertEquals(expected, count);
+ break;
+ case JOIN:
+ assertEquals(expected, count);
+ break;
+ case QUERY:
+ assertEquals(expected, count);
+ break;
+ default:
+ break;
+ }
+ }
+ }
+
+}