You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rya.apache.org by ca...@apache.org on 2017/08/02 21:02:00 UTC
[7/9] incubator-rya git commit: RYA-280-Periodic Query Service.
Closes #177.
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/BatchRowKeyUtil.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/BatchRowKeyUtil.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/BatchRowKeyUtil.java
new file mode 100644
index 0000000..581aa5b
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/BatchRowKeyUtil.java
@@ -0,0 +1,68 @@
+package org.apache.rya.indexing.pcj.fluo.app.batch;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import java.util.UUID;
+
+import org.apache.fluo.api.data.Bytes;
+import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Class for creating the {@link Byte}s written to the Fluo Row used to identify each {@link BatchInformation}
+ * object. Each Byte row is formed by concatenating a query id and a batch id.
+ *
+ */
+public class BatchRowKeyUtil {
+
+ /**
+ * Creates a Byte row form the query id. The batch id is automatically generated/
+ * @param nodeId
+ * @return Byte row used to identify the BatchInformation
+ */
+ public static Bytes getRow(String nodeId) {
+ String row = new StringBuilder().append(nodeId).append(IncrementalUpdateConstants.NODEID_BS_DELIM)
+ .append(UUID.randomUUID().toString().replace("-", "")).toString();
+ return Bytes.of(row);
+ }
+
+ /**
+ * Creates a Byte row from a nodeId and batchId
+ * @param nodeId - query node id that batch task will be performed on
+ * @param batchId - id used to identify batch
+ * @return Byte row used to identify the BatchInformation
+ */
+ public static Bytes getRow(String nodeId, String batchId) {
+ String row = new StringBuilder().append(nodeId).append(IncrementalUpdateConstants.NODEID_BS_DELIM)
+ .append(batchId).toString();
+ return Bytes.of(row);
+ }
+
+ /**
+ * Given a Byte row, return the query node Id
+ * @param row - the Byte row used to identify the BatchInformation
+ * @return - the queryId that the batch task is performed on
+ */
+ public static String getNodeId(Bytes row) {
+ String[] stringArray = row.toString().split(IncrementalUpdateConstants.NODEID_BS_DELIM);;
+ Preconditions.checkArgument(stringArray.length == 2);
+ return stringArray[0];
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/JoinBatchBindingSetUpdater.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/JoinBatchBindingSetUpdater.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/JoinBatchBindingSetUpdater.java
new file mode 100644
index 0000000..a266341
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/JoinBatchBindingSetUpdater.java
@@ -0,0 +1,184 @@
+package org.apache.rya.indexing.pcj.fluo.app.batch;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Optional;
+import java.util.Set;
+
+import org.apache.fluo.api.client.TransactionBase;
+import org.apache.fluo.api.client.scanner.ColumnScanner;
+import org.apache.fluo.api.client.scanner.RowScanner;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.data.ColumnValue;
+import org.apache.fluo.api.data.RowColumn;
+import org.apache.fluo.api.data.Span;
+import org.apache.log4j.Logger;
+import org.apache.rya.indexing.pcj.fluo.app.JoinResultUpdater.IterativeJoin;
+import org.apache.rya.indexing.pcj.fluo.app.JoinResultUpdater.LeftOuterJoin;
+import org.apache.rya.indexing.pcj.fluo.app.JoinResultUpdater.NaturalJoin;
+import org.apache.rya.indexing.pcj.fluo.app.JoinResultUpdater.Side;
+import org.apache.rya.indexing.pcj.fluo.app.batch.BatchInformation.Task;
+import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
+import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO;
+import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata;
+import org.apache.rya.indexing.pcj.fluo.app.util.RowKeyUtil;
+import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
+import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
+import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSetSerDe;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Performs updates to BindingSets in the JoinBindingSet column in batch fashion.
+ */
+public class JoinBatchBindingSetUpdater extends AbstractBatchBindingSetUpdater {
+
+ private static final Logger log = Logger.getLogger(JoinBatchBindingSetUpdater.class);
+ private static final VisibilityBindingSetSerDe BS_SERDE = new VisibilityBindingSetSerDe();
+ private static final FluoQueryMetadataDAO dao = new FluoQueryMetadataDAO();
+
+ /**
+ * Processes {@link JoinBatchInformation}. Updates the BindingSets
+ * associated with the specified nodeId. The BindingSets are processed in
+ * batch fashion, where the number of results is indicated by
+ * {@link JoinBatchInformation#getBatchSize()}. BindingSets are either
+ * Added, Deleted, or Updated according to
+ * {@link JoinBatchInformation#getTask()}. In the event that the number of
+ * entries that need to be updated exceeds the batch size, the row of the
+ * first unprocessed BindingSets is used to create a new JoinBatch job to
+ * process the remaining BindingSets.
+ * @throws Exception
+ */
+ @Override
+ public void processBatch(TransactionBase tx, Bytes row, BatchInformation batch) throws Exception {
+ super.processBatch(tx, row, batch);
+ String nodeId = BatchRowKeyUtil.getNodeId(row);
+ Preconditions.checkArgument(batch instanceof JoinBatchInformation);
+ JoinBatchInformation joinBatch = (JoinBatchInformation) batch;
+ Task task = joinBatch.getTask();
+
+ // Figure out which join algorithm we are going to use.
+ final IterativeJoin joinAlgorithm;
+ switch (joinBatch.getJoinType()) {
+ case NATURAL_JOIN:
+ joinAlgorithm = new NaturalJoin();
+ break;
+ case LEFT_OUTER_JOIN:
+ joinAlgorithm = new LeftOuterJoin();
+ break;
+ default:
+ throw new RuntimeException("Unsupported JoinType: " + joinBatch.getJoinType());
+ }
+
+ Set<VisibilityBindingSet> bsSet = new HashSet<>();
+ Optional<RowColumn> rowCol = fillSiblingBatch(tx, joinBatch, bsSet);
+
+ // Iterates over the resulting BindingSets from the join.
+ final Iterator<VisibilityBindingSet> newJoinResults;
+ VisibilityBindingSet bs = joinBatch.getBs();
+ if (joinBatch.getSide() == Side.LEFT) {
+ newJoinResults = joinAlgorithm.newLeftResult(bs, bsSet.iterator());
+ } else {
+ newJoinResults = joinAlgorithm.newRightResult(bsSet.iterator(), bs);
+ }
+
+ // Insert the new join binding sets to the Fluo table.
+ final JoinMetadata joinMetadata = dao.readJoinMetadata(tx, nodeId);
+ final VariableOrder joinVarOrder = joinMetadata.getVariableOrder();
+ while (newJoinResults.hasNext()) {
+ final VisibilityBindingSet newJoinResult = newJoinResults.next();
+ //create BindingSet value
+ Bytes bsBytes = BS_SERDE.serialize(newJoinResult);
+ //make rowId
+ Bytes rowKey = RowKeyUtil.makeRowKey(nodeId, joinVarOrder, newJoinResult);
+ final Column col = FluoQueryColumns.JOIN_BINDING_SET;
+ processTask(tx, task, rowKey, col, bsBytes);
+ }
+
+ // if batch limit met, there are additional entries to process
+ // update the span and register updated batch job
+ if (rowCol.isPresent()) {
+ Span newSpan = getNewSpan(rowCol.get(), joinBatch.getSpan());
+ joinBatch.setSpan(newSpan);
+ BatchInformationDAO.addBatch(tx, nodeId, joinBatch);
+ }
+
+ }
+
+ private void processTask(TransactionBase tx, Task task, Bytes row, Column column, Bytes value) {
+ switch (task) {
+ case Add:
+ tx.set(row, column, value);
+ break;
+ case Delete:
+ tx.delete(row, column);
+ break;
+ case Update:
+ log.trace("The Task Update is not supported for JoinBatchBindingSetUpdater. Batch will not be processed.");
+ break;
+ default:
+ log.trace("Invalid Task type. Aborting batch operation.");
+ break;
+ }
+ }
+
+ /**
+ * Fetches batch to be processed by scanning over the Span specified by the
+ * {@link JoinBatchInformation}. The number of results is less than or equal
+ * to the batch size specified by the JoinBatchInformation.
+ *
+ * @param tx - Fluo transaction in which batch operation is performed
+ * @param batch - batch order to be processed
+ * @param bsSet- set that batch results are added to
+ * @return Set - containing results of sibling scan.
+ * @throws Exception
+ */
+ private Optional<RowColumn> fillSiblingBatch(TransactionBase tx, JoinBatchInformation batch, Set<VisibilityBindingSet> bsSet) throws Exception {
+
+ Span span = batch.getSpan();
+ Column column = batch.getColumn();
+ int batchSize = batch.getBatchSize();
+
+ RowScanner rs = tx.scanner().over(span).fetch(column).byRow().build();
+ Iterator<ColumnScanner> colScannerIter = rs.iterator();
+
+ boolean batchLimitMet = false;
+ Bytes row = span.getStart().getRow();
+ while (colScannerIter.hasNext() && !batchLimitMet) {
+ ColumnScanner colScanner = colScannerIter.next();
+ row = colScanner.getRow();
+ Iterator<ColumnValue> iter = colScanner.iterator();
+ while (iter.hasNext()) {
+ if (bsSet.size() >= batchSize) {
+ batchLimitMet = true;
+ break;
+ }
+ bsSet.add(BS_SERDE.deserialize(iter.next().getValue()));
+ }
+ }
+
+ if (batchLimitMet) {
+ return Optional.of(new RowColumn(row, column));
+ } else {
+ return Optional.empty();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/JoinBatchInformation.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/JoinBatchInformation.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/JoinBatchInformation.java
new file mode 100644
index 0000000..71ac557
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/JoinBatchInformation.java
@@ -0,0 +1,255 @@
+package org.apache.rya.indexing.pcj.fluo.app.batch;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import java.util.Objects;
+
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.data.Span;
+import org.apache.rya.indexing.pcj.fluo.app.JoinResultUpdater.Side;
+import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata.JoinType;
+import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
+import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
+import org.openrdf.query.Binding;
+
+import jline.internal.Preconditions;
+
+/**
+ * This class updates join results based on parameters specified for the join's
+ * children. The join has two children, and for one child a VisibilityBindingSet
+ * is specified along with the Side of that child. This BindingSet represents an
+ * update to that join child. For the other child, a Span, Column and
+ * VariableOrder are specified. This is so that the sibling node (the node that
+ * wasn't updated) can be scanned to obtain results that can be joined with the
+ * VisibilityBindingSet. The assumption here is that the Span is derived from
+ * the {@link Binding}s of common variables between the join children, with
+ * Values ordered according to the indicated {@link VariableOrder}. This class
+ * represents a batch order to perform a given task on join BindingSet results.
+ * The {@link Task} is to Add, Delete, or Update. This batch order is processed
+ * by the {@link BatchObserver} and used with the nodeId provided to the
+ * Observer to process the Task specified by the batch order. If the Task is to
+ * add, the BatchBindingSetUpdater returned by
+ * {@link JoinBatchInformation#getBatchUpdater()} will scan the join's child for
+ * results using the indicated Span and Column. These results are joined with
+ * the indicated VisibilityBindingSet, and the results are added to the parent
+ * join. The other Tasks are performed analogously.
+ *
+ */
+public class JoinBatchInformation extends AbstractSpanBatchInformation {
+
+ private static final BatchBindingSetUpdater updater = new JoinBatchBindingSetUpdater();
+ private VisibilityBindingSet bs; //update for join child indicated by side
+ private VariableOrder varOrder; //variable order for child indicated by Span
+ private Side side; //join child that was updated by bs
+ private JoinType join;
+ /**
+ * @param batchSize - batch size that Tasks are performed in
+ * @param task - Add, Delete, or Update
+ * @param column - Column of join child to be scanned
+ * @param span - span of join child to be scanned (derived from common variables of left and right join children)
+ * @param bs - BindingSet to be joined with results of child scan
+ * @param varOrder - VariableOrder used to form join (order for join child corresponding to Span)
+ * @param side - The side of the child that the VisibilityBindingSet update occurred at
+ * @param join - JoinType (left, right, natural inner)
+ */
+ public JoinBatchInformation(int batchSize, Task task, Column column, Span span, VisibilityBindingSet bs, VariableOrder varOrder, Side side, JoinType join) {
+ super(batchSize, task, column, span);
+ this.bs = Preconditions.checkNotNull(bs);
+ this.varOrder = Preconditions.checkNotNull(varOrder);
+ this.side = Preconditions.checkNotNull(side);
+ this.join = Preconditions.checkNotNull(join);
+ }
+
+ public JoinBatchInformation(Task task, Column column, Span span, VisibilityBindingSet bs, VariableOrder varOrder, Side side, JoinType join) {
+ this(DEFAULT_BATCH_SIZE, task, column, span, bs, varOrder, side, join);
+ }
+
+ /**
+ * Indicates the join child that the BindingSet result {@link JoinBatchInformation#getBs()} updated.
+ * This BindingSet is join with the results obtained by scanning over the value of {@link JoinBatchInformation#getSpan()}.
+ * @return {@link Side} indicating which side new result occurred on in join
+ */
+ public Side getSide() {
+ return side;
+ }
+
+ /**
+ * @return {@link JoinType} indicating type of join (left join, right join, natural inner join,...)
+ */
+ public JoinType getJoinType() {
+ return join;
+ }
+
+ /**
+ * Returns the VariableOrder for the join child corresponding to the Span.
+ * @return {@link VariableOrder} used to join {@link VisibilityBindingSet}s.
+ */
+ public VariableOrder getVarOrder() {
+ return varOrder;
+ }
+
+ /**
+ * Sets the VisibilityBindingSet that represents an update to the join child. The join child
+ * updated is indicated by the value of {@link JoinBatchInformation#getSide()}.
+ * @return VisibilityBindingSet that will be joined with results returned by scan over given
+ * {@link Span}.
+ */
+ public VisibilityBindingSet getBs() {
+ return bs;
+ }
+
+ /**
+ * @return BatchBindingSetUpdater used to apply {@link Task} to results formed by joining the given
+ * VisibilityBindingSet with the results returned by scanned over the Span.
+ */
+ @Override
+ public BatchBindingSetUpdater getBatchUpdater() {
+ return updater;
+ }
+
+ @Override
+ public String toString() {
+ return new StringBuilder()
+ .append("Span Batch Information {\n")
+ .append(" Batch Size: " + super.getBatchSize() + "\n")
+ .append(" Task: " + super.getTask() + "\n")
+ .append(" Column: " + super.getColumn() + "\n")
+ .append(" VariableOrder: " + varOrder + "\n")
+ .append(" Join Type: " + join + "\n")
+ .append(" Join Side: " + side + "\n")
+ .append(" Binding Set: " + bs + "\n")
+ .append("}")
+ .toString();
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (this == other) {
+ return true;
+ }
+
+ if (!(other instanceof JoinBatchInformation)) {
+ return false;
+ }
+
+ JoinBatchInformation batch = (JoinBatchInformation) other;
+ return super.equals(other) && Objects.equals(this.bs, batch.bs) && Objects.equals(this.join, batch.join)
+ && Objects.equals(this.side, batch.side) && Objects.equals(this.varOrder, batch.varOrder);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(super.getBatchSize(), super.getColumn(), super.getSpan(), super.getTask(), bs, join, side, varOrder);
+ }
+
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ public static class Builder {
+
+ private int batchSize = DEFAULT_BATCH_SIZE;
+ private Task task;
+ private Column column;
+ private Span span;
+ private VisibilityBindingSet bs;
+ private VariableOrder varOrder;
+ private JoinType join;
+ private Side side;
+
+ /**
+ * @param batchSize - batch size that {@link Task}s are performed in
+ */
+ public Builder setBatchSize(int batchSize) {
+ this.batchSize = batchSize;
+ return this;
+ }
+
+ /**
+ * @param task - Task performed (Add, Delete, Update)
+ */
+ public Builder setTask(Task task) {
+ this.task = task;
+ return this;
+ }
+
+ /**
+ * @param column - Column of join child to be scanned
+ */
+ public Builder setColumn(Column column) {
+ this.column = column;
+ return this;
+ }
+
+ /**
+ * Span to scan results for one child of the join. The Span corresponds to the side of
+ * the join that is not indicated by Side. So if Side is Left, then the
+ * Span will scan the right child of the join. It is assumed that the span is derived from
+ * the common variables of the left and right join children.
+ * @param span - Span over join child to be scanned
+ */
+ public Builder setSpan(Span span) {
+ this.span = span;
+ return this;
+ }
+
+ /**
+ * Sets the BindingSet that corresponds to an update to the join child indicated
+ * by Side.
+ * @param bs - BindingSet update of join child to be joined with results of scan
+ */
+ public Builder setBs(VisibilityBindingSet bs) {
+ this.bs = bs;
+ return this;
+ }
+
+ /**
+ * @param join - JoinType (left, right, natural inner)
+ */
+ public Builder setJoinType(JoinType join) {
+ this.join = join;
+ return this;
+ }
+
+ /**
+ * Indicates the join child corresponding to the VisibilityBindingSet update
+ * @param side - side of join the child BindingSet update appeared at
+ */
+ public Builder setSide(Side side) {
+ this.side = side;
+ return this;
+ }
+
+ /**
+ * Sets the variable order for the join child corresponding to the Span
+ * @param varOrder - Variable order used to join BindingSet with result of scan
+ */
+ public Builder setVarOrder(VariableOrder varOrder) {
+ this.varOrder = varOrder;
+ return this;
+ }
+
+ /**
+ * @return an instance of {@link JoinBatchInformation} constructed from the parameters passed to this Builder
+ */
+ public JoinBatchInformation build() {
+ return new JoinBatchInformation(batchSize, task, column, span, bs, varOrder, side, join);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/SpanBatchBindingSetUpdater.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/SpanBatchBindingSetUpdater.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/SpanBatchBindingSetUpdater.java
new file mode 100644
index 0000000..749a77d
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/SpanBatchBindingSetUpdater.java
@@ -0,0 +1,128 @@
+package org.apache.rya.indexing.pcj.fluo.app.batch;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import java.util.Iterator;
+import java.util.Optional;
+
+import org.apache.fluo.api.client.TransactionBase;
+import org.apache.fluo.api.client.scanner.ColumnScanner;
+import org.apache.fluo.api.client.scanner.RowScanner;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.data.ColumnValue;
+import org.apache.fluo.api.data.RowColumn;
+import org.apache.fluo.api.data.Span;
+import org.apache.log4j.Logger;
+import org.apache.rya.indexing.pcj.fluo.app.batch.BatchInformation.Task;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * This class processes {@link SpanBatchDeleteInformation} objects by
+ * deleting the entries in the Fluo Column corresponding to the {@link Span}
+ * of the BatchInformation object. This class will delete entries until the
+ * batch size is met, and then create a new SpanBatchDeleteInformation object
+ * with an updated Span whose starting point is the stopping point of this
+ * batch. If the batch limit is not met, then a new batch is not created and
+ * the task is complete.
+ *
+ */
+public class SpanBatchBindingSetUpdater extends AbstractBatchBindingSetUpdater {
+
+ private static final Logger log = Logger.getLogger(SpanBatchBindingSetUpdater.class);
+
+ /**
+ * Process SpanBatchDeleteInformation objects by deleting all entries indicated
+ * by Span until batch limit is met.
+ * @param tx - Fluo Transaction
+ * @param row - Byte row identifying BatchInformation
+ * @param batch - SpanBatchDeleteInformation object to be processed
+ */
+ @Override
+ public void processBatch(TransactionBase tx, Bytes row, BatchInformation batch) throws Exception {
+ super.processBatch(tx, row, batch);
+ Preconditions.checkArgument(batch instanceof SpanBatchDeleteInformation);
+ SpanBatchDeleteInformation spanBatch = (SpanBatchDeleteInformation) batch;
+ Task task = spanBatch.getTask();
+ int batchSize = spanBatch.getBatchSize();
+ Span span = spanBatch.getSpan();
+ Column column = batch.getColumn();
+ Optional<RowColumn> rowCol = Optional.empty();
+
+ switch (task) {
+ case Add:
+ log.trace("The Task Add is not supported for SpanBatchBindingSetUpdater. Batch " + batch + " will not be processed.");
+ break;
+ case Delete:
+ rowCol = deleteBatch(tx, span, column, batchSize);
+ break;
+ case Update:
+ log.trace("The Task Update is not supported for SpanBatchBindingSetUpdater. Batch " + batch + " will not be processed.");
+ break;
+ default:
+ log.trace("Invalid Task type. Aborting batch operation.");
+ break;
+ }
+
+ if (rowCol.isPresent()) {
+ Span newSpan = getNewSpan(rowCol.get(), spanBatch.getSpan());
+ log.trace("Batch size met. There are remaining results that need to be deleted. Creating a new batch of size: "
+ + spanBatch.getBatchSize() + " with Span: " + newSpan + " and Column: " + column);
+ spanBatch.setSpan(newSpan);
+ BatchInformationDAO.addBatch(tx, BatchRowKeyUtil.getNodeId(row), spanBatch);
+ }
+ }
+
+ private Optional<RowColumn> deleteBatch(TransactionBase tx, Span span, Column column, int batchSize) {
+
+ log.trace("Deleting batch of size: " + batchSize + " using Span: " + span + " and Column: " + column);
+ RowScanner rs = tx.scanner().over(span).fetch(column).byRow().build();
+ try {
+ Iterator<ColumnScanner> colScannerIter = rs.iterator();
+
+ int count = 0;
+ boolean batchLimitMet = false;
+ Bytes row = span.getStart().getRow();
+ while (colScannerIter.hasNext() && !batchLimitMet) {
+ ColumnScanner colScanner = colScannerIter.next();
+ row = colScanner.getRow();
+ Iterator<ColumnValue> iter = colScanner.iterator();
+ while (iter.hasNext()) {
+ if (count >= batchSize) {
+ batchLimitMet = true;
+ break;
+ }
+ ColumnValue colVal = iter.next();
+ tx.delete(row, colVal.getColumn());
+ count++;
+ }
+ }
+
+ if (batchLimitMet) {
+ return Optional.of(new RowColumn(row));
+ } else {
+ return Optional.empty();
+ }
+ } catch (Exception e) {
+ return Optional.empty();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/SpanBatchDeleteInformation.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/SpanBatchDeleteInformation.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/SpanBatchDeleteInformation.java
new file mode 100644
index 0000000..3b1e245
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/SpanBatchDeleteInformation.java
@@ -0,0 +1,95 @@
+package org.apache.rya.indexing.pcj.fluo.app.batch;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.data.Span;
+
+/**
+ * This class represents a batch order to delete all entries in the Fluo table indicated
+ * by the given Span and Column. These batch orders are processed by the {@link BatchObserver},
+ * which uses this batch information along with the nodeId passed into the Observer to perform
+ * batch deletes.
+ *
+ */
+public class SpanBatchDeleteInformation extends AbstractSpanBatchInformation {
+
+ private static final BatchBindingSetUpdater updater = new SpanBatchBindingSetUpdater();
+
+ public SpanBatchDeleteInformation(int batchSize, Column column, Span span) {
+ super(batchSize, Task.Delete, column, span);
+ }
+
+ /**
+ * @return Updater that applies the {@link Task} to the given {@link Span} and {@link Column}
+ */
+ @Override
+ public BatchBindingSetUpdater getBatchUpdater() {
+ return updater;
+ }
+
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ public static class Builder {
+
+ private int batchSize = DEFAULT_BATCH_SIZE;
+ private Column column;
+ private Span span;
+
+ /**
+ * @param batchSize - {@link Task}s are applied in batches of this size
+ */
+ public Builder setBatchSize(int batchSize) {
+ this.batchSize = batchSize;
+ return this;
+ }
+
+ /**
+ * Sets column to apply batch {@link Task} to
+ * @param column - column batch Task will be applied to
+ * @return
+ */
+ public Builder setColumn(Column column) {
+ this.column = column;
+ return this;
+ }
+
+ /**
+ * @param span - span that batch {@link Task} will be applied to
+ *
+ */
+ public Builder setSpan(Span span) {
+ this.span = span;
+ return this;
+ }
+
+
+ /**
+ * @return an instance of {@link SpanBatchDeleteInformation} constructed from parameters passed to this Builder
+ */
+ public SpanBatchDeleteInformation build() {
+ return new SpanBatchDeleteInformation(batchSize, column, span);
+ }
+
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/serializer/BatchInformationSerializer.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/serializer/BatchInformationSerializer.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/serializer/BatchInformationSerializer.java
new file mode 100644
index 0000000..e6f69d0
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/serializer/BatchInformationSerializer.java
@@ -0,0 +1,58 @@
+package org.apache.rya.indexing.pcj.fluo.app.batch.serializer;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import java.util.Optional;
+
+import org.apache.log4j.Logger;
+import org.apache.rya.indexing.pcj.fluo.app.batch.BatchInformation;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+
+/**
+ * Serializer/Deserializer for {@link BatchInformation} objects that uses the Gson
+ * Type Adapter {@link BatchInformationTypeAdapter} to do all of the serializing and deserializing.
+ *
+ *
+ */
+public class BatchInformationSerializer {
+
+ private static Logger log = Logger.getLogger(BatchInformationSerializer.class);
+ private static Gson gson = new GsonBuilder().registerTypeHierarchyAdapter(BatchInformation.class, new BatchInformationTypeAdapter())
+ .create();
+
+ public static byte[] toBytes(BatchInformation arg0) {
+ try {
+ return gson.toJson(arg0).getBytes("UTF-8");
+ } catch (Exception e) {
+ log.info("Unable to serialize BatchInformation: " + arg0);
+ throw new RuntimeException(e);
+ }
+ }
+
+ public static Optional<BatchInformation> fromBytes(byte[] arg0) {
+ try {
+ String json = new String(arg0, "UTF-8");
+ return Optional.of(gson.fromJson(json, BatchInformation.class));
+ } catch (Exception e) {
+ log.info("Invalid String encoding. BatchInformation cannot be deserialized.");
+ return Optional.empty();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/serializer/BatchInformationTypeAdapter.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/serializer/BatchInformationTypeAdapter.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/serializer/BatchInformationTypeAdapter.java
new file mode 100644
index 0000000..d7c15df
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/serializer/BatchInformationTypeAdapter.java
@@ -0,0 +1,73 @@
+package org.apache.rya.indexing.pcj.fluo.app.batch.serializer;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import java.lang.reflect.Type;
+
+import org.apache.log4j.Logger;
+import org.apache.rya.indexing.pcj.fluo.app.batch.BatchInformation;
+import org.apache.rya.indexing.pcj.fluo.app.batch.JoinBatchInformation;
+import org.apache.rya.indexing.pcj.fluo.app.batch.SpanBatchDeleteInformation;
+
+import com.google.gson.JsonDeserializationContext;
+import com.google.gson.JsonDeserializer;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParseException;
+import com.google.gson.JsonSerializationContext;
+import com.google.gson.JsonSerializer;
+
+/**
+ * JsonSerializer/JsonDeserializer for serializing/deserializing
+ * {@link BatchInformation} objects. This makes use of the
+ * {@link BatchInformationTypeAdapterFactory} to retrieve the appropriate
+ * JsonSerializer/JsonDeserializer given the class name of the particular
+ * implementation of BatchInformation.
+ *
+ */
+public class BatchInformationTypeAdapter implements JsonSerializer<BatchInformation>, JsonDeserializer<BatchInformation> {
+
+ private static final Logger log = Logger.getLogger(BatchInformationTypeAdapter.class);
+ private static final BatchInformationTypeAdapterFactory factory = new BatchInformationTypeAdapterFactory();
+
+ @Override
+ public BatchInformation deserialize(JsonElement arg0, Type arg1, JsonDeserializationContext arg2) throws JsonParseException {
+ try {
+ JsonObject json = arg0.getAsJsonObject();
+ String type = json.get("class").getAsString();
+ JsonDeserializer<? extends BatchInformation> deserializer = factory.getDeserializerFromName(type);
+ return deserializer.deserialize(arg0, arg1, arg2);
+ } catch (Exception e) {
+ log.trace("Unable to deserialize JsonElement: " + arg0);
+ log.trace("Returning an empty Batch");
+ throw new JsonParseException(e);
+ }
+ }
+
+ @Override
+ public JsonElement serialize(BatchInformation batch, Type arg1, JsonSerializationContext arg2) {
+ JsonSerializer<? extends BatchInformation> serializer = factory.getSerializerFromName(batch.getClass().getName());
+
+ if(batch instanceof SpanBatchDeleteInformation) {
+ return ((SpanBatchInformationTypeAdapter) serializer).serialize((SpanBatchDeleteInformation) batch, arg1, arg2);
+ } else {
+ return ((JoinBatchInformationTypeAdapter) serializer).serialize((JoinBatchInformation) batch, arg1, arg2);
+ }
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/serializer/BatchInformationTypeAdapterFactory.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/serializer/BatchInformationTypeAdapterFactory.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/serializer/BatchInformationTypeAdapterFactory.java
new file mode 100644
index 0000000..0221bc2
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/serializer/BatchInformationTypeAdapterFactory.java
@@ -0,0 +1,65 @@
+package org.apache.rya.indexing.pcj.fluo.app.batch.serializer;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import java.util.Map;
+
+import org.apache.rya.indexing.pcj.fluo.app.batch.BatchInformation;
+import org.apache.rya.indexing.pcj.fluo.app.batch.JoinBatchInformation;
+import org.apache.rya.indexing.pcj.fluo.app.batch.SpanBatchDeleteInformation;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.gson.JsonDeserializer;
+import com.google.gson.JsonSerializer;
+
+/**
+ * Factory the uses class names to return the appropriate {@link JsonSerializer} and {@link JsonDeserializer} for serializing
+ * and deserializing {@link BatchInformation} objects.
+ *
+ */
+public class BatchInformationTypeAdapterFactory {
+
+ /**
+ * Retrieve the appropriate {@link JsonSerializer} using the class name of the {@link BatchInformation} implementation
+ * @param name - class name of the BatchInformation object
+ * @return JsonSerializer for serializing BatchInformation objects
+ */
+ public JsonSerializer<? extends BatchInformation> getSerializerFromName(String name) {
+ return serializers.get(name);
+ }
+
+ /**
+ * Retrieve the appropriate {@link JsonDeserializer} using the class name of the {@link BatchInformation} implementation
+ * @param name - class name of the BatchInformation object
+ * @return JsonDeserializer for deserializing BatchInformation objects
+ */
+ public JsonDeserializer<? extends BatchInformation> getDeserializerFromName(String name) {
+ return deserializers.get(name);
+ }
+
+ static final Map<String, JsonSerializer<? extends BatchInformation>> serializers = ImmutableMap.of(
+ SpanBatchDeleteInformation.class.getName(), new SpanBatchInformationTypeAdapter(),
+ JoinBatchInformation.class.getName(), new JoinBatchInformationTypeAdapter()
+ );
+
+ static final Map<String, JsonDeserializer<? extends BatchInformation>> deserializers = ImmutableMap.of(
+ SpanBatchDeleteInformation.class.getName(), new SpanBatchInformationTypeAdapter(),
+ JoinBatchInformation.class.getName(), new JoinBatchInformationTypeAdapter()
+ );
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/serializer/JoinBatchInformationTypeAdapter.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/serializer/JoinBatchInformationTypeAdapter.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/serializer/JoinBatchInformationTypeAdapter.java
new file mode 100644
index 0000000..9f3f1a6
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/serializer/JoinBatchInformationTypeAdapter.java
@@ -0,0 +1,94 @@
+package org.apache.rya.indexing.pcj.fluo.app.batch.serializer;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import java.lang.reflect.Type;
+
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.data.RowColumn;
+import org.apache.fluo.api.data.Span;
+import org.apache.rya.indexing.pcj.fluo.app.JoinResultUpdater.Side;
+import org.apache.rya.indexing.pcj.fluo.app.batch.BatchInformation.Task;
+import org.apache.rya.indexing.pcj.fluo.app.batch.JoinBatchInformation;
+import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata.JoinType;
+import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
+import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
+import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSetStringConverter;
+
+import com.google.common.base.Joiner;
+import com.google.gson.JsonDeserializationContext;
+import com.google.gson.JsonDeserializer;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParseException;
+import com.google.gson.JsonPrimitive;
+import com.google.gson.JsonSerializationContext;
+import com.google.gson.JsonSerializer;
+
+/**
+ * JsonSerializer/JsonDeserializer to serialize/deserialize {@link JoinBatchInformation} objects.
+ *
+ */
+public class JoinBatchInformationTypeAdapter implements JsonSerializer<JoinBatchInformation>, JsonDeserializer<JoinBatchInformation> {
+
+ private static final VisibilityBindingSetStringConverter converter = new VisibilityBindingSetStringConverter();
+
+ @Override
+ public JsonElement serialize(JoinBatchInformation batch, Type typeOfSrc, JsonSerializationContext context) {
+ JsonObject result = new JsonObject();
+ result.add("class", new JsonPrimitive(batch.getClass().getName()));
+ result.add("batchSize", new JsonPrimitive(batch.getBatchSize()));
+ result.add("task", new JsonPrimitive(batch.getTask().name()));
+ Column column = batch.getColumn();
+ result.add("column", new JsonPrimitive(column.getsFamily() + "\u0000" + column.getsQualifier()));
+ Span span = batch.getSpan();
+ result.add("span", new JsonPrimitive(span.getStart().getsRow() + "\u0000" + span.getEnd().getsRow()));
+ result.add("startInc", new JsonPrimitive(span.isStartInclusive()));
+ result.add("endInc", new JsonPrimitive(span.isEndInclusive()));
+ result.add("varOrder", new JsonPrimitive(Joiner.on(";").join(batch.getVarOrder().getVariableOrders())));
+ result.add("side", new JsonPrimitive(batch.getSide().name()));
+ result.add("joinType", new JsonPrimitive(batch.getJoinType().name()));
+ String updateVarOrderString = Joiner.on(";").join(batch.getBs().getBindingNames());
+ VariableOrder updateVarOrder = new VariableOrder(updateVarOrderString);
+ result.add("bindingSet", new JsonPrimitive(converter.convert(batch.getBs(), updateVarOrder)));
+ result.add("updateVarOrder", new JsonPrimitive(updateVarOrderString));
+ return result;
+ }
+
+ @Override
+ public JoinBatchInformation deserialize(JsonElement element, Type typeOfT, JsonDeserializationContext context)
+ throws JsonParseException {
+ JsonObject json = element.getAsJsonObject();
+ int batchSize = json.get("batchSize").getAsInt();
+ Task task = Task.valueOf(json.get("task").getAsString());
+ String[] colArray = json.get("column").getAsString().split("\u0000");
+ Column column = new Column(colArray[0], colArray[1]);
+ String[] rows = json.get("span").getAsString().split("\u0000");
+ boolean startInc = json.get("startInc").getAsBoolean();
+ boolean endInc = json.get("endInc").getAsBoolean();
+ Span span = new Span(new RowColumn(rows[0]), startInc, new RowColumn(rows[1]), endInc);
+ VariableOrder varOrder = new VariableOrder(json.get("varOrder").getAsString());
+ VariableOrder updateVarOrder = new VariableOrder(json.get("updateVarOrder").getAsString());
+ VisibilityBindingSet bs = converter.convert(json.get("bindingSet").getAsString(), updateVarOrder);
+ Side side = Side.valueOf(json.get("side").getAsString());
+ JoinType join = JoinType.valueOf(json.get("joinType").getAsString());
+ return JoinBatchInformation.builder().setBatchSize(batchSize).setTask(task).setSpan(span).setColumn(column).setBs(bs).setVarOrder(varOrder)
+ .setSide(side).setJoinType(join).build();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/serializer/SpanBatchInformationTypeAdapter.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/serializer/SpanBatchInformationTypeAdapter.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/serializer/SpanBatchInformationTypeAdapter.java
new file mode 100644
index 0000000..98deb8e
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/serializer/SpanBatchInformationTypeAdapter.java
@@ -0,0 +1,69 @@
+package org.apache.rya.indexing.pcj.fluo.app.batch.serializer;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import java.lang.reflect.Type;
+
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.data.RowColumn;
+import org.apache.fluo.api.data.Span;
+import org.apache.rya.indexing.pcj.fluo.app.batch.SpanBatchDeleteInformation;
+
+import com.google.gson.JsonDeserializationContext;
+import com.google.gson.JsonDeserializer;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParseException;
+import com.google.gson.JsonPrimitive;
+import com.google.gson.JsonSerializationContext;
+import com.google.gson.JsonSerializer;
+
+/**
+ * JsonSerializer/JsonDeserializer used to serialize/deserialize {@link SpanBatchDeleteInformation} objects.
+ *
+ */
+public class SpanBatchInformationTypeAdapter implements JsonSerializer<SpanBatchDeleteInformation>, JsonDeserializer<SpanBatchDeleteInformation> {
+
+ @Override
+ public SpanBatchDeleteInformation deserialize(JsonElement element, Type typeOfT, JsonDeserializationContext context) throws JsonParseException {
+ JsonObject json = element.getAsJsonObject();
+ int batchSize = json.get("batchSize").getAsInt();
+ String[] colArray = json.get("column").getAsString().split("\u0000");
+ Column column = new Column(colArray[0], colArray[1]);
+ String[] rows = json.get("span").getAsString().split("\u0000");
+ boolean startInc = json.get("startInc").getAsBoolean();
+ boolean endInc = json.get("endInc").getAsBoolean();
+ Span span = new Span(new RowColumn(rows[0]), startInc, new RowColumn(rows[1]), endInc);
+ return SpanBatchDeleteInformation.builder().setBatchSize(batchSize).setSpan(span).setColumn(column).build();
+ }
+
+ @Override
+ public JsonElement serialize(SpanBatchDeleteInformation batch, Type typeOfSrc, JsonSerializationContext context) {
+ JsonObject result = new JsonObject();
+ result.add("class", new JsonPrimitive(batch.getClass().getName()));
+ result.add("batchSize", new JsonPrimitive(batch.getBatchSize()));
+ Column column = batch.getColumn();
+ result.add("column", new JsonPrimitive(column.getsFamily() + "\u0000" + column.getsQualifier()));
+ Span span = batch.getSpan();
+ result.add("span", new JsonPrimitive(span.getStart().getsRow() + "\u0000" + span.getEnd().getsRow()));
+ result.add("startInc", new JsonPrimitive(span.isStartInclusive()));
+ result.add("endInc", new JsonPrimitive(span.isEndInclusive()));
+ return result;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaBindingSetExporter.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaBindingSetExporter.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaBindingSetExporter.java
index 152d156..7c4b3cc 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaBindingSetExporter.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaBindingSetExporter.java
@@ -36,16 +36,16 @@ import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
* Incrementally exports SPARQL query results to Kafka topics.
*/
public class KafkaBindingSetExporter implements IncrementalBindingSetExporter {
+
private static final Logger log = Logger.getLogger(KafkaBindingSetExporter.class);
-
private final KafkaProducer<String, VisibilityBindingSet> producer;
+
/**
* Constructs an instance given a Kafka producer.
*
- * @param producer
- * for sending result set alerts to a broker. (not null)
- * Can be created and configured by {@link KafkaBindingSetExporterFactory}
+ * @param producer for sending result set alerts to a broker. (not null) Can be created and configured by
+ * {@link KafkaBindingSetExporterFactory}
*/
public KafkaBindingSetExporter(KafkaProducer<String, VisibilityBindingSet> producer) {
super();
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaBindingSetExporter.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaBindingSetExporter.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaBindingSetExporter.java
index 84d3ce6..54c39b7 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaBindingSetExporter.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaBindingSetExporter.java
@@ -24,8 +24,11 @@ import static java.util.Objects.requireNonNull;
import java.util.Collections;
import org.apache.fluo.api.client.TransactionBase;
+import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants;
import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporter;
import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
+import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage;
+import org.apache.rya.indexing.pcj.storage.PeriodicQueryStorageException;
import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.PCJStorageException;
import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
@@ -36,14 +39,16 @@ import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
public class RyaBindingSetExporter implements IncrementalBindingSetExporter {
private final PrecomputedJoinStorage pcjStorage;
+ private final PeriodicQueryResultStorage periodicStorage;
/**
* Constructs an instance of {@link RyaBindingSetExporter}.
*
* @param pcjStorage - The PCJ storage the new results will be exported to. (not null)
*/
- public RyaBindingSetExporter(final PrecomputedJoinStorage pcjStorage) {
+ public RyaBindingSetExporter(final PrecomputedJoinStorage pcjStorage, PeriodicQueryResultStorage periodicStorage) {
this.pcjStorage = checkNotNull(pcjStorage);
+ this.periodicStorage = checkNotNull(periodicStorage);
}
@Override
@@ -59,8 +64,12 @@ public class RyaBindingSetExporter implements IncrementalBindingSetExporter {
final String pcjId = fluoTx.gets(queryId, FluoQueryColumns.RYA_PCJ_ID);
try {
- pcjStorage.addResults(pcjId, Collections.singleton(result));
- } catch (final PCJStorageException e) {
+ if (result.hasBinding(IncrementalUpdateConstants.PERIODIC_BIN_ID)) {
+ periodicStorage.addPeriodicQueryResults(pcjId, Collections.singleton(result));
+ } else {
+ pcjStorage.addResults(pcjId, Collections.singleton(result));
+ }
+ } catch (final PCJStorageException | PeriodicQueryStorageException e) {
throw new ResultExportException("A result could not be exported to Rya.", e);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaBindingSetExporterFactory.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaBindingSetExporterFactory.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaBindingSetExporterFactory.java
index 86d593f..82ce9c6 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaBindingSetExporterFactory.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaBindingSetExporterFactory.java
@@ -28,8 +28,10 @@ import org.apache.accumulo.core.client.ZooKeeperInstance;
import org.apache.accumulo.core.client.security.tokens.PasswordToken;
import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporter;
import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporterFactory;
+import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage;
import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage;
+import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPeriodicQueryResultStorage;
import com.google.common.base.Optional;
@@ -62,9 +64,10 @@ public class RyaBindingSetExporterFactory implements IncrementalBindingSetExport
// Setup Rya PCJ Storage.
final String ryaInstanceName = params.getRyaInstanceName().get();
final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(accumuloConn, ryaInstanceName);
-
+ final PeriodicQueryResultStorage periodicStorage = new AccumuloPeriodicQueryResultStorage(accumuloConn, ryaInstanceName);
+
// Make the exporter.
- final IncrementalBindingSetExporter exporter = new RyaBindingSetExporter(pcjStorage);
+ final IncrementalBindingSetExporter exporter = new RyaBindingSetExporter(pcjStorage, periodicStorage);
return Optional.of(exporter);
} catch (final AccumuloException | AccumuloSecurityException e) {
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/BindingSetUpdater.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/BindingSetUpdater.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/BindingSetUpdater.java
index ac131e3..3a731c2 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/BindingSetUpdater.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/BindingSetUpdater.java
@@ -30,12 +30,14 @@ import org.apache.rya.indexing.pcj.fluo.app.ConstructQueryResultUpdater;
import org.apache.rya.indexing.pcj.fluo.app.FilterResultUpdater;
import org.apache.rya.indexing.pcj.fluo.app.JoinResultUpdater;
import org.apache.rya.indexing.pcj.fluo.app.NodeType;
+import org.apache.rya.indexing.pcj.fluo.app.PeriodicQueryUpdater;
import org.apache.rya.indexing.pcj.fluo.app.QueryResultUpdater;
import org.apache.rya.indexing.pcj.fluo.app.query.AggregationMetadata;
import org.apache.rya.indexing.pcj.fluo.app.query.ConstructQueryMetadata;
import org.apache.rya.indexing.pcj.fluo.app.query.FilterMetadata;
import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO;
import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata;
+import org.apache.rya.indexing.pcj.fluo.app.query.PeriodicQueryMetadata;
import org.apache.rya.indexing.pcj.fluo.app.query.QueryMetadata;
import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
@@ -50,7 +52,6 @@ import edu.umd.cs.findbugs.annotations.NonNull;
@DefaultAnnotation(NonNull.class)
public abstract class BindingSetUpdater extends AbstractObserver {
private static final Logger log = Logger.getLogger(BindingSetUpdater.class);
-
// DAO
private final FluoQueryMetadataDAO queryDao = new FluoQueryMetadataDAO();
@@ -60,6 +61,7 @@ public abstract class BindingSetUpdater extends AbstractObserver {
private final QueryResultUpdater queryUpdater = new QueryResultUpdater();
private final AggregationResultUpdater aggregationUpdater = new AggregationResultUpdater();
private final ConstructQueryResultUpdater constructUpdater = new ConstructQueryResultUpdater();
+ private final PeriodicQueryUpdater periodicQueryUpdater = new PeriodicQueryUpdater();
@Override
public abstract ObservedColumn getObservedColumn();
@@ -131,6 +133,15 @@ public abstract class BindingSetUpdater extends AbstractObserver {
throw new RuntimeException("Could not process a Join node.", e);
}
break;
+
+ case PERIODIC_QUERY:
+ final PeriodicQueryMetadata parentPeriodicQuery = queryDao.readPeriodicQueryMetadata(tx, parentNodeId);
+ try{
+ periodicQueryUpdater.updatePeriodicBinResults(tx, observedBindingSet, parentPeriodicQuery);
+ } catch(Exception e) {
+ throw new RuntimeException("Could not process PeriodicBin node.", e);
+ }
+ break;
case AGGREGATION:
final AggregationMetadata parentAggregation = queryDao.readAggregationMetadata(tx, parentNodeId);
@@ -141,8 +152,9 @@ public abstract class BindingSetUpdater extends AbstractObserver {
}
break;
+
default:
- throw new IllegalArgumentException("The parent node's NodeType must be of type Filter, Join, or Query, but was " + parentNodeType);
+ throw new IllegalArgumentException("The parent node's NodeType must be of type Filter, Join, PeriodicBin or Query, but was " + parentNodeType);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/FilterObserver.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/FilterObserver.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/FilterObserver.java
index f5c7177..ee03334 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/FilterObserver.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/FilterObserver.java
@@ -23,11 +23,11 @@ import static java.util.Objects.requireNonNull;
import org.apache.fluo.api.client.TransactionBase;
import org.apache.fluo.api.data.Bytes;
import org.apache.rya.indexing.pcj.fluo.app.BindingSetRow;
-import org.apache.rya.indexing.pcj.fluo.app.VisibilityBindingSetSerDe;
import org.apache.rya.indexing.pcj.fluo.app.query.FilterMetadata;
import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO;
import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
+import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSetSerDe;
import org.openrdf.query.BindingSet;
/**
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/JoinObserver.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/JoinObserver.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/JoinObserver.java
index 141ccc7..28e31d8 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/JoinObserver.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/JoinObserver.java
@@ -23,11 +23,11 @@ import static java.util.Objects.requireNonNull;
import org.apache.fluo.api.client.TransactionBase;
import org.apache.fluo.api.data.Bytes;
import org.apache.rya.indexing.pcj.fluo.app.BindingSetRow;
-import org.apache.rya.indexing.pcj.fluo.app.VisibilityBindingSetSerDe;
import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO;
import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata;
import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
+import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSetSerDe;
import org.openrdf.query.BindingSet;
/**
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/PeriodicQueryObserver.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/PeriodicQueryObserver.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/PeriodicQueryObserver.java
new file mode 100644
index 0000000..e7072e7
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/PeriodicQueryObserver.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.observers;
+
+import static java.util.Objects.requireNonNull;
+
+import org.apache.fluo.api.client.TransactionBase;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.rya.indexing.pcj.fluo.app.BindingSetRow;
+import org.apache.rya.indexing.pcj.fluo.app.PeriodicQueryUpdater;
+import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
+import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO;
+import org.apache.rya.indexing.pcj.fluo.app.query.PeriodicQueryMetadata;
+import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
+import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSetSerDe;
+
+/**
+ * This Observer is responsible for assigning Periodic Bin Ids to BindingSets.
+ * This class delegates to the {@link BindingSetUpdater} process method, which
+ * uses the {@link PeriodicQueryUpdater} to extract the time stamp from the BindingSet.
+ * The PeriodicQueryUpdater creates one instance of the given BindingSet for each bin
+ * that the time stamp is assigned to by the updater, and these BindingSets are written
+ * to the parent node of the given PeriodicQueryMetadata node.
+ *
+ */
+public class PeriodicQueryObserver extends BindingSetUpdater {
+
+ private static final VisibilityBindingSetSerDe BS_SERDE = new VisibilityBindingSetSerDe();
+ private final FluoQueryMetadataDAO queryDao = new FluoQueryMetadataDAO();
+
+ @Override
+ public ObservedColumn getObservedColumn() {
+ return new ObservedColumn(FluoQueryColumns.PERIODIC_QUERY_BINDING_SET, NotificationType.STRONG);
+ }
+
+ @Override
+ public Observation parseObservation(final TransactionBase tx, final Bytes row) throws Exception {
+ requireNonNull(tx);
+ requireNonNull(row);
+
+ // Read the Join metadata.
+ final String periodicBinNodeId = BindingSetRow.make(row).getNodeId();
+ final PeriodicQueryMetadata periodicBinMetadata = queryDao.readPeriodicQueryMetadata(tx, periodicBinNodeId);
+
+ // Read the Visibility Binding Set from the Value.
+ final Bytes valueBytes = tx.get(row, FluoQueryColumns.PERIODIC_QUERY_BINDING_SET);
+ final VisibilityBindingSet periodicBinBindingSet = BS_SERDE.deserialize(valueBytes);
+
+ // Figure out which node needs to handle the new metadata.
+ final String parentNodeId = periodicBinMetadata.getParentNodeId();
+
+ return new Observation(periodicBinNodeId, periodicBinBindingSet, parentNodeId);
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java
index b675ba7..fbdca08 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java
@@ -29,7 +29,6 @@ import org.apache.fluo.api.data.Column;
import org.apache.fluo.api.observer.AbstractObserver;
import org.apache.log4j.Logger;
import org.apache.rya.accumulo.utils.VisibilitySimplifier;
-import org.apache.rya.indexing.pcj.fluo.app.VisibilityBindingSetSerDe;
import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporter;
import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporter.ResultExportException;
import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporterFactory;
@@ -38,6 +37,7 @@ import org.apache.rya.indexing.pcj.fluo.app.export.kafka.KafkaBindingSetExporter
import org.apache.rya.indexing.pcj.fluo.app.export.rya.RyaBindingSetExporterFactory;
import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
+import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSetSerDe;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableSet;
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/StatementPatternObserver.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/StatementPatternObserver.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/StatementPatternObserver.java
index b0548b4..69a651e 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/StatementPatternObserver.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/StatementPatternObserver.java
@@ -23,11 +23,11 @@ import static java.util.Objects.requireNonNull;
import org.apache.fluo.api.client.TransactionBase;
import org.apache.fluo.api.data.Bytes;
import org.apache.rya.indexing.pcj.fluo.app.BindingSetRow;
-import org.apache.rya.indexing.pcj.fluo.app.VisibilityBindingSetSerDe;
import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO;
import org.apache.rya.indexing.pcj.fluo.app.query.StatementPatternMetadata;
import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
+import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSetSerDe;
import org.openrdf.query.BindingSet;
/**
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/TripleObserver.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/TripleObserver.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/TripleObserver.java
index 3c43885..6fc8e91 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/TripleObserver.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/TripleObserver.java
@@ -34,12 +34,12 @@ import org.apache.fluo.api.observer.AbstractObserver;
import org.apache.log4j.Logger;
import org.apache.rya.api.domain.RyaStatement;
import org.apache.rya.indexing.pcj.fluo.app.IncUpdateDAO;
-import org.apache.rya.indexing.pcj.fluo.app.VisibilityBindingSetSerDe;
import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO;
import org.apache.rya.indexing.pcj.fluo.app.query.StatementPatternMetadata;
import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
+import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSetSerDe;
import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSetStringConverter;
import com.google.common.base.Charsets;
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/AggregationMetadata.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/AggregationMetadata.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/AggregationMetadata.java
index 3bc8da6..ff42a0f 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/AggregationMetadata.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/AggregationMetadata.java
@@ -321,6 +321,13 @@ public class AggregationMetadata extends CommonNodeMetadata {
this.varOrder = varOrder;
return this;
}
+
+ /**
+ * @return the variable order of binding sets that are emitted by this node.
+ */
+ public VariableOrder getVariableOrder() {
+ return varOrder;
+ }
/**
* @param parentNodeId - The Node ID of this node's parent.
@@ -330,6 +337,10 @@ public class AggregationMetadata extends CommonNodeMetadata {
this.parentNodeId = parentNodeId;
return this;
}
+
+ public String getParentNodeId() {
+ return parentNodeId;
+ }
/**
* @param childNodeId - The Node ID of this node's child.
@@ -360,6 +371,13 @@ public class AggregationMetadata extends CommonNodeMetadata {
this.groupByVariables = groupByVariables;
return this;
}
+
+ /**
+ * @return variable order that defines how data is grouped for the aggregation function
+ */
+ public VariableOrder getGroupByVariableOrder() {
+ return groupByVariables;
+ }
/**
* @return An instance of {@link AggregationMetadata} build using this builder's values.
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FilterMetadata.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FilterMetadata.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FilterMetadata.java
index 8866bd4..7e2e995 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FilterMetadata.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FilterMetadata.java
@@ -18,19 +18,19 @@
*/
package org.apache.rya.indexing.pcj.fluo.app.query;
-import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
-import edu.umd.cs.findbugs.annotations.Nullable;
-import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
-import edu.umd.cs.findbugs.annotations.NonNull;
-import net.jcip.annotations.Immutable;
-
import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.rya.indexing.pcj.fluo.app.util.FilterSerializer;
import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
import com.google.common.base.Objects;
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+import edu.umd.cs.findbugs.annotations.Nullable;
+import net.jcip.annotations.Immutable;
+
/**
* Metadata that is specific to Filter nodes.
*/
@@ -38,8 +38,7 @@ import com.google.common.base.Objects;
@DefaultAnnotation(NonNull.class)
public class FilterMetadata extends CommonNodeMetadata {
- private final String originalSparql;
- private final int filterIndexWithinSparql;
+ private final String filterSparql;
private final String parentNodeId;
private final String childNodeId;
@@ -48,7 +47,7 @@ public class FilterMetadata extends CommonNodeMetadata {
*
* @param nodeId - The ID the Fluo app uses to reference this node. (not null)
* @param varOrder - The variable order of binding sets that are emitted by this node. (not null)
- * @param originalSparql - The original SPARQL query the filter is derived from. (not null)
+ * @param filterSparql - SPARQL query representing the filter as generated by {@link FilterSerializer#serialize}. (not null)
* @param filterIndexWithinSparql - The index of the filter within the original SPARQL query
* that this node processes. (not null)
* @param parentNodeId - The node id of this node's parent. (not null)
@@ -57,14 +56,11 @@ public class FilterMetadata extends CommonNodeMetadata {
public FilterMetadata(
final String nodeId,
final VariableOrder varOrder,
- final String originalSparql,
- final int filterIndexWithinSparql,
+ final String filterSparql,
final String parentNodeId,
final String childNodeId) {
super(nodeId, varOrder);
- this.originalSparql = checkNotNull(originalSparql);
- checkArgument(filterIndexWithinSparql >= 0 , "filterIndexWithinSparql must be >= 0, was " + filterIndexWithinSparql);
- this.filterIndexWithinSparql = filterIndexWithinSparql;
+ this.filterSparql = checkNotNull(filterSparql);
this.parentNodeId = checkNotNull(parentNodeId);
this.childNodeId = checkNotNull(childNodeId);
}
@@ -72,16 +68,8 @@ public class FilterMetadata extends CommonNodeMetadata {
/**
* @return The original SPARQL query the filter is derived from.
*/
- public String getOriginalSparql() {
- return originalSparql;
- }
-
- /**
- * @return The index of the filter within the original SPARQL query that
- * this node processes.
- */
- public int getFilterIndexWithinSparql() {
- return filterIndexWithinSparql;
+ public String getFilterSparql() {
+ return filterSparql;
}
/**
@@ -103,8 +91,7 @@ public class FilterMetadata extends CommonNodeMetadata {
return Objects.hashCode(
super.getNodeId(),
super.getVariableOrder(),
- originalSparql,
- filterIndexWithinSparql,
+ filterSparql,
parentNodeId,
childNodeId);
}
@@ -119,8 +106,7 @@ public class FilterMetadata extends CommonNodeMetadata {
if(super.equals(o)) {
final FilterMetadata filterMetadata = (FilterMetadata)o;
return new EqualsBuilder()
- .append(originalSparql, filterMetadata.originalSparql)
- .append(filterIndexWithinSparql, filterMetadata.filterIndexWithinSparql)
+ .append(filterSparql, filterMetadata.filterSparql)
.append(parentNodeId, filterMetadata.parentNodeId)
.append(childNodeId, filterMetadata.childNodeId)
.isEquals();
@@ -140,8 +126,7 @@ public class FilterMetadata extends CommonNodeMetadata {
.append(" Variable Order: " + super.getVariableOrder() + "\n")
.append(" Parent Node ID: " + parentNodeId + "\n")
.append(" Child Node ID: " + childNodeId + "\n")
- .append(" Original SPARQL: " + originalSparql + "\n")
- .append(" Filter Index Within SPARQL: " + filterIndexWithinSparql + "\n")
+ .append(" Original SPARQL: " + filterSparql + "\n")
.append("}")
.toString();
}
@@ -164,8 +149,7 @@ public class FilterMetadata extends CommonNodeMetadata {
private final String nodeId;
private VariableOrder varOrder;
- private String originalSparql;
- private int filterIndexWithinSparql;
+ private String filterSparql;
private String parentNodeId;
private String childNodeId;
@@ -202,20 +186,8 @@ public class FilterMetadata extends CommonNodeMetadata {
* @param originalSparql - The original SPARQL query the filter is derived from.
* @return This builder so that method invocations may be chained.
*/
- public Builder setOriginalSparql(final String originalSparql) {
- this.originalSparql = originalSparql;
- return this;
- }
-
- /**
- * Set the index of the filter within the original SPARQL query that this node processes.
- *
- * @param filterIndexWithinSparql - The index of the filter within the original
- * SPARQL query that this node processes.
- * @return This builder so that method invocations may be chained.
- */
- public Builder setFilterIndexWithinSparql(final int filterIndexWithinSparql) {
- this.filterIndexWithinSparql = filterIndexWithinSparql;
+ public Builder setFilterSparql(final String originalSparql) {
+ this.filterSparql = originalSparql;
return this;
}
@@ -248,8 +220,7 @@ public class FilterMetadata extends CommonNodeMetadata {
return new FilterMetadata(
nodeId,
varOrder,
- originalSparql,
- filterIndexWithinSparql,
+ filterSparql,
parentNodeId,
childNodeId);
}