You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rya.apache.org by pu...@apache.org on 2016/05/13 16:42:45 UTC
[3/6] incubator-rya git commit: RYA-64 - Integrated Rya PCJ Secondary
Index support into core Rya.
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/14073a23/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/AccumuloPcjSerializer.java
----------------------------------------------------------------------
diff --git a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/AccumuloPcjSerializer.java b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/AccumuloPcjSerializer.java
new file mode 100644
index 0000000..83d1c40
--- /dev/null
+++ b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/AccumuloPcjSerializer.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.storage.accumulo;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static mvm.rya.api.RdfCloudTripleStoreConstants.DELIM_BYTE;
+import static mvm.rya.api.RdfCloudTripleStoreConstants.DELIM_BYTES;
+import static mvm.rya.api.RdfCloudTripleStoreConstants.TYPE_DELIM_BYTE;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+
+import javax.annotation.ParametersAreNonnullByDefault;
+
+import org.openrdf.model.Value;
+import org.openrdf.query.Binding;
+import org.openrdf.query.BindingSet;
+import org.openrdf.query.algebra.evaluation.QueryBindingSet;
+
+import com.google.common.primitives.Bytes;
+
+import mvm.rya.api.domain.RyaType;
+import mvm.rya.api.resolver.RdfToRyaConversions;
+import mvm.rya.api.resolver.RyaContext;
+import mvm.rya.api.resolver.RyaToRdfConversions;
+import mvm.rya.api.resolver.RyaTypeResolverException;
+
+/**
+ * Converts {@link BindingSet}s to byte[]s and back again. The bytes do not
+ * include the binding names and are ordered with a {@link VariableOrder}.
+ */
+@ParametersAreNonnullByDefault
+public class AccumuloPcjSerializer implements BindingSetConverter<byte[]> {
+
+ @Override
+ public byte[] convert(BindingSet bindingSet, VariableOrder varOrder) throws BindingSetConversionException {
+ checkNotNull(bindingSet);
+ checkNotNull(varOrder);
+ checkBindingsSubsetOfVarOrder(bindingSet, varOrder);
+
+ // A list that holds all of the byte segments that will be concatenated at the end.
+ // This minimizes byte[] construction.
+ final List<byte[]> byteSegments = new LinkedList<>();
+
+ try {
+ for(final String varName: varOrder) {
+ // Only write information for a variable name if the binding set contains it.
+ if(bindingSet.hasBinding(varName)) {
+ final RyaType rt = RdfToRyaConversions.convertValue(bindingSet.getBinding(varName).getValue());
+ final byte[][] serializedVal = RyaContext.getInstance().serializeType(rt);
+ byteSegments.add(serializedVal[0]);
+ byteSegments.add(serializedVal[1]);
+ }
+
+ // But always write the value delimiter. If a value is missing, you'll see two delimiters next to each-other.
+ byteSegments.add(DELIM_BYTES);
+ }
+
+ return concat(byteSegments);
+ } catch (RyaTypeResolverException e) {
+ throw new BindingSetConversionException("Could not convert the BindingSet into a byte[].", e);
+ }
+ }
+
+ @Override
+ public BindingSet convert(byte[] bindingSetBytes, VariableOrder varOrder) throws BindingSetConversionException {
+ checkNotNull(bindingSetBytes);
+ checkNotNull(varOrder);
+
+ try {
+ // Slice the row into bindings.
+ List<byte[]> values = splitlByDelimByte(bindingSetBytes);
+ String[] varOrderStrings = varOrder.toArray();
+ checkArgument(values.size() == varOrderStrings.length);
+
+ // Convert the Binding bytes into a BindingSet.
+ final QueryBindingSet bindingSet = new QueryBindingSet();
+
+ for(int i = 0; i < varOrderStrings.length; i++) {
+ byte[] valueBytes = values.get(i);
+ if(valueBytes.length > 0) {
+ String name = varOrderStrings[i];
+ Value value = deserializeValue(valueBytes);
+ bindingSet.addBinding(name, value);
+ }
+ }
+
+ return bindingSet;
+ } catch (RyaTypeResolverException e) {
+ throw new BindingSetConversionException("Could not convert the byte[] into a BindingSet.", e);
+ }
+ }
+
+ /**
+ * Checks to see if the names of all the {@link Binding}s in the {@link BindingSet}
+ * are a subset of the variables names in {@link VariableOrder}.
+ *
+ * @param bindingSet - The binding set whose Bindings will be inspected. (not null)
+ * @param varOrder - The names of the bindings that may appear in the BindingSet. (not null)
+ * @throws IllegalArgumentException Indicates the names of the bindings are
+ * not a subset of the variable order.
+ */
+ private static void checkBindingsSubsetOfVarOrder(BindingSet bindingSet, VariableOrder varOrder) throws IllegalArgumentException {
+ checkNotNull(bindingSet);
+ checkNotNull(varOrder);
+
+ Set<String> bindingNames = bindingSet.getBindingNames();
+ List<String> varNames = varOrder.getVariableOrders();
+ checkArgument(varNames.containsAll(bindingNames), "The BindingSet contains a Binding whose name is not part of the VariableOrder.");
+ }
+
+ private static final byte[] concat(Iterable<byte[]> byteSegments) {
+ checkNotNull(byteSegments);
+
+ // Allocate a byte array that is able to hold the segments.
+ int length = 0;
+ for(byte[] byteSegment : byteSegments) {
+ length += byteSegment.length;
+ }
+ byte[] result = new byte[length];
+
+ // Copy the segments to the byte array and return it.
+ ByteBuffer buff = ByteBuffer.wrap(result);
+ for(byte[] byteSegment : byteSegments) {
+ buff.put(byteSegment);
+ }
+ return result;
+ }
+
+ private static List<byte[]> splitlByDelimByte(byte[] bindingSetBytes) {
+ checkNotNull(bindingSetBytes);
+
+ List<byte[]> values = new LinkedList<>();
+
+ ByteBuffer buff = ByteBuffer.wrap(bindingSetBytes);
+ int start = 0;
+ while(buff.hasRemaining()) {
+ if(buff.get() == DELIM_BYTE) {
+ // Mark the position of the value delimiter.
+ int end = buff.position();
+
+ // Move to the start of the value and copy the bytes into an array.
+ byte[] valueBytes = new byte[(end - start) -1];
+ buff.position(start);
+ buff.get(valueBytes);
+ buff.position(end);
+ values.add(valueBytes);
+
+ // Move the start of the next value to the end of this one.
+ start = end;
+ }
+ }
+
+ return values;
+ }
+
+ private static Value deserializeValue(byte[] byteVal) throws RyaTypeResolverException {
+ final int typeIndex = Bytes.indexOf(byteVal, TYPE_DELIM_BYTE);
+ checkArgument(typeIndex >= 0);
+ final byte[] data = Arrays.copyOf(byteVal, typeIndex);
+ final byte[] type = Arrays.copyOfRange(byteVal, typeIndex, byteVal.length);
+ final RyaType rt = RyaContext.getInstance().deserialize(Bytes.concat(data,type));
+ return RyaToRdfConversions.convertValue(rt);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/14073a23/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/BindingSetConverter.java
----------------------------------------------------------------------
diff --git a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/BindingSetConverter.java b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/BindingSetConverter.java
new file mode 100644
index 0000000..9a52531
--- /dev/null
+++ b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/BindingSetConverter.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.storage.accumulo;
+
+import javax.annotation.ParametersAreNonnullByDefault;
+
+import org.openrdf.query.Binding;
+import org.openrdf.query.BindingSet;
+
+/**
+ * Converts {@link BindingSet}s into other representations. This library is
+ * intended to convert between BindingSet and whatever format it is being
+ * stored as. These formats are often optimized for query evaluation.
+ *
+ * @param <T> Defines the type of model {@link BindingSet}s will be converted into/from.
+ */
+@ParametersAreNonnullByDefault
+public interface BindingSetConverter<T> {
+
+ /**
+ * Converts a {@link BindingSet} into the target model. The target model
+ * may not include every {@link Binding} that was in the original BindingSet,
+ * it may not include the binding names, and it may order the binding values.
+ * All of this information is specified using a {@link VariableOrder}.
+ * </p>
+ * Because the resulting model may not include the binding names from the
+ * original object, you must hold onto that information if you want to
+ * convert the resulting model back into a BindingSet later. Because the
+ * resulting model may only contain a subset of the original BindingSet's
+ * bindings, some information may be lost, so you may not be able to convert
+ * the target model back into the original BindingSet.
+ *
+ * @param bindingSet - The BindingSet that will be converted. (not null)
+ * @param varOrder - Which bindings and in what order they will appear in the
+ * resulting model. (not null)
+ * @return The BindingSet formatted as the target model.
+ * @throws BindingSetConversionException The BindingSet was unable to be
+ * converted into the target model. This will happen if the BindingSet has
+ * a binding whose name is not in the VariableOrder or if one of the values
+ * could not be converted into the target model.
+ */
+ public T convert(BindingSet bindingSet, VariableOrder varOrder) throws BindingSetConversionException;
+
+ /**
+ * Converts the target model representation of a {@link BindingSet} as is
+ * created by {@link #convert(BindingSet, VariableOrder)} back into a
+ * BindingSet.
+ * </p>
+ * You must provide the Binding names and the order they were written to
+ * by using a {@link VariableOrder}.
+ * </p>
+ * If there is no value for one of the variable order names, then that binding
+ * will be missing from the resulting BindingSet.
+ *
+ * @param bindingSet - The BindingSet formatted as the target model that will
+ * be converted. (not null)
+ * @param varOrder - The VariableOrder that was used to create the target model. (not null)
+ * @return The {@link BindingSet} representation of the target model.
+ * @throws BindingSetConversionException The target model was unable to be
+ * converted back into a BindingSet.
+ */
+ public BindingSet convert(T bindingSet, VariableOrder varOrder) throws BindingSetConversionException;
+
+ /**
+ * One of the conversion methods of {@link BindingSetConverter} was unable to
+ * to convert the {@link BindingSet} to/from the converted model.
+ */
+ public static class BindingSetConversionException extends Exception {
+ private static final long serialVersionUID = 1L;
+
+ /**
+ * Constructs an instance of {@link BindingSetConversionException}.
+ *
+ * @param message - Describes why this exception was thrown.
+ */
+ public BindingSetConversionException(final String message) {
+ super(message);
+ }
+
+ /**
+ * BindingSetConversionException
+ *
+ * @param message - Describes why this exception was thrown.
+ * @param cause - The exception that caused this one to be thrown.
+ */
+ public BindingSetConversionException(final String message, final Throwable cause) {
+ super(message, cause);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/14073a23/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/BindingSetDecorator.java
----------------------------------------------------------------------
diff --git a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/BindingSetDecorator.java b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/BindingSetDecorator.java
new file mode 100644
index 0000000..ddc7be5
--- /dev/null
+++ b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/BindingSetDecorator.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.storage.accumulo;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.Iterator;
+import java.util.Set;
+
+import org.openrdf.model.Value;
+import org.openrdf.query.Binding;
+import org.openrdf.query.BindingSet;
+
+/**
+ * Abstracts out the decoration of a {@link BindingSet}.
+ */
+public abstract class BindingSetDecorator implements BindingSet {
+ private static final long serialVersionUID = 1L;
+ protected final BindingSet set;
+ private volatile int hashCode;
+
+ /**
+ * Constructs a new {@link BindingSetDecorator}, decorating the provided
+ * {@link BindingSet}.
+ * @param set - The {@link BindingSet} to be decorated. (not null)
+ */
+ public BindingSetDecorator(final BindingSet set) {
+ this.set = checkNotNull(set);
+ }
+
+ @Override
+ public Iterator<Binding> iterator() {
+ return set.iterator();
+ }
+
+ @Override
+ public Set<String> getBindingNames() {
+ return set.getBindingNames();
+ }
+
+ @Override
+ public Binding getBinding(final String bindingName) {
+ return set.getBinding(bindingName);
+ }
+
+ @Override
+ public boolean hasBinding(final String bindingName) {
+ return set.hasBinding(bindingName);
+ }
+
+ @Override
+ public Value getValue(final String bindingName) {
+ return set.getValue(bindingName);
+ }
+
+ @Override
+ public int size() {
+ return set.size();
+ }
+
+ @Override
+ public boolean equals(final Object o) {
+ if(!(o instanceof BindingSetDecorator)) {
+ return false;
+ }
+ final BindingSetDecorator other = (BindingSetDecorator) o;
+ return set.equals(other.set);
+ }
+
+ @Override
+ public int hashCode() {
+ int result = hashCode;
+ if(result == 0) {
+ result = 31 * result + set.hashCode();
+ hashCode = result;
+ }
+ return result;
+ }
+
+ @Override
+ public String toString() {
+ final StringBuilder sb = new StringBuilder();
+ sb.append(" names: ");
+ for (final String name : getBindingNames()) {
+ sb.append("\n [name]: " + name + " --- [value]: " + getBinding(name).getValue().toString());
+ }
+ return sb.toString();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/14073a23/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/BindingSetStringConverter.java
----------------------------------------------------------------------
diff --git a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/BindingSetStringConverter.java b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/BindingSetStringConverter.java
new file mode 100644
index 0000000..913cfc9
--- /dev/null
+++ b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/BindingSetStringConverter.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.storage.accumulo;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+
+import javax.annotation.ParametersAreNonnullByDefault;
+
+import org.openrdf.model.URI;
+import org.openrdf.model.Value;
+import org.openrdf.model.ValueFactory;
+import org.openrdf.model.impl.URIImpl;
+import org.openrdf.model.impl.ValueFactoryImpl;
+import org.openrdf.model.vocabulary.XMLSchema;
+import org.openrdf.query.Binding;
+import org.openrdf.query.BindingSet;
+import org.openrdf.query.algebra.evaluation.QueryBindingSet;
+
+import com.google.common.base.Joiner;
+
+import mvm.rya.api.domain.RyaType;
+import mvm.rya.api.resolver.RdfToRyaConversions;
+
+/**
+ * Converts {@link BindingSet}s to Strings and back again. The Strings do not
+ * include the binding names and are ordered with a {@link VariableOrder}.
+ */
+@ParametersAreNonnullByDefault
+public class BindingSetStringConverter implements BindingSetConverter<String> {
+
+ public static final String BINDING_DELIM = ":::";
+ public static final String TYPE_DELIM = "<<~>>";
+ public static final String NULL_VALUE_STRING = Character.toString( '\0' );
+
+ private static final ValueFactory valueFactory = new ValueFactoryImpl();
+
+ @Override
+ public String convert(final BindingSet bindingSet, final VariableOrder varOrder) {
+ checkBindingsSubsetOfVarOrder(bindingSet, varOrder);
+
+ // Convert each Binding to a String.
+ final List<String> bindingStrings = new ArrayList<>();
+ for(final String varName : varOrder) {
+ if(bindingSet.hasBinding(varName)) {
+ // Add a value to the binding set.
+ final Value value = bindingSet.getBinding(varName).getValue();
+ final RyaType ryaValue = RdfToRyaConversions.convertValue(value);
+ final String bindingString = ryaValue.getData() + TYPE_DELIM + ryaValue.getDataType();
+ bindingStrings.add(bindingString);
+ } else {
+ // Add a null value to the binding set.
+ bindingStrings.add(NULL_VALUE_STRING);
+ }
+ }
+
+ // Join the bindings using the binding delim.
+ return Joiner.on(BINDING_DELIM).join(bindingStrings);
+ }
+
+ /**
+ * Checks to see if the names of all the {@link Binding}s in the {@link BindingSet}
+ * are a subset of the variables names in {@link VariableOrder}.
+ *
+ * @param bindingSet - The binding set whose Bindings will be inspected. (not null)
+ * @param varOrder - The names of the bindings that may appear in the BindingSet. (not null)
+ * @throws IllegalArgumentException Indicates the names of the bindings are
+ * not a subset of the variable order.
+ */
+ private static void checkBindingsSubsetOfVarOrder(final BindingSet bindingSet, final VariableOrder varOrder) throws IllegalArgumentException {
+ checkNotNull(bindingSet);
+ checkNotNull(varOrder);
+
+ final Set<String> bindingNames = bindingSet.getBindingNames();
+ final List<String> varNames = varOrder.getVariableOrders();
+ checkArgument(varNames.containsAll(bindingNames), "The BindingSet contains a Binding whose name is not part of the VariableOrder.");
+ }
+
+ @Override
+ public BindingSet convert(final String bindingSetString, final VariableOrder varOrder) {
+ checkNotNull(bindingSetString);
+ checkNotNull(varOrder);
+
+ final String[] bindingStrings = bindingSetString.split(BINDING_DELIM);
+ final String[] varOrrderArr = varOrder.toArray();
+ checkArgument(varOrrderArr.length == bindingStrings.length, "The number of Bindings must match the length of the VariableOrder.");
+
+ final QueryBindingSet bindingSet = new QueryBindingSet();
+ for(int i = 0; i < bindingStrings.length; i++) {
+ final String bindingString = bindingStrings[i];
+ if(!NULL_VALUE_STRING.equals(bindingString)) {
+ final String name = varOrrderArr[i];
+ final Value value = toValue(bindingStrings[i]);
+ bindingSet.addBinding(name, value);
+ }
+ }
+ return bindingSet;
+ }
+
+ /**
+ * Creates a {@link Value} from a String representation of it.
+ *
+ * @param valueString - The String representation of the value. (not null)
+ * @return The {@link Value} representation of the String.
+ */
+ protected static Value toValue(final String valueString) {
+ checkNotNull(valueString);
+
+ // Split the String that was stored in Fluo into its Value and Type parts.
+ final String[] valueAndType = valueString.split(TYPE_DELIM);
+ if(valueAndType.length != 2) {
+ throw new IllegalArgumentException("Array must contain data and type info!");
+ }
+
+ final String dataString = valueAndType[0];
+ final String typeString = valueAndType[1];
+
+ // Convert the String Type into a URI that describes the type.
+ final URI typeURI = valueFactory.createURI(typeString);
+
+ // Convert the String Value into a Value.
+ final Value value = typeURI.equals(XMLSchema.ANYURI) ?
+ valueFactory.createURI(dataString) :
+ valueFactory.createLiteral(dataString, new URIImpl(typeString));
+
+ return value;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/14073a23/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjTableNameFactory.java
----------------------------------------------------------------------
diff --git a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjTableNameFactory.java b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjTableNameFactory.java
new file mode 100644
index 0000000..653b4cc
--- /dev/null
+++ b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjTableNameFactory.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.storage.accumulo;
+
+import java.util.UUID;
+
+/**
+ * Creates Accumulo table names that may be recognized by Rya as a table that
+ * holds the results of a Precomputed Join.
+ */
+public class PcjTableNameFactory {
+
+ /**
+ * Creates an Accumulo table names that may be recognized by Rya as a table
+ * that holds the results of a Precomputed Join.
+ * </p>
+ * An Accumulo cluster may host more than one Rya instance. To ensure each
+ * Rya instance's RDF Triples are segregated from each other, they are stored
+ * within different Accumulo tables. This is accomplished by prepending a
+ * {@code tablePrefix} to every table that is owned by a Rya instance. Each
+ * PCJ table is owned by a specific Rya instance, so it too must be prepended
+ * with the instance's {@code tablePrefix}.
+ * </p>
+ * When Rya scans for PCJ tables that it may use when creating execution plans,
+ * it looks for any table in Accumulo that has a name starting with its
+ * {@code tablePrefix} immediately followed by "INDEX". Anything following
+ * that portion of the table name is just a unique identifier for the SPARQL
+ * query that is being precomputed. Here's an example of what a table name
+ * may look like:
+ * <pre>
+ * demo_INDEX_QUERY:c8f5367c-1660-4210-a7cb-681ed004d2d9
+ * </pre>
+ * The "demo_INDEX" portion indicates this table is a PCJ table for the "demo_"
+ * instance of Rya. The "_QUERY:c8f5367c-1660-4210-a7cb-681ed004d2d9" portion
+ * could be anything at all that uniquely identifies the query that is being updated.
+ *
+ * @param tablePrefix - The Rya instance's table prefix. (not null)
+ * @param uniqueId - The unique portion of the Rya PCJ table name. (not null)
+ * @return A Rya PCJ table name built using the provided values.
+ */
+ public String makeTableName(final String tablePrefix, final String uniqueId) {
+ return tablePrefix + "INDEX_" + uniqueId;
+ }
+
+ /**
+ * Invokes {@link #makeTableName(String, String)} with a randomly generated
+ * UUID as the {@code uniqueId}.
+ *
+ * @param tablePrefix - The Rya instance's table prefix. (not null)
+ * @return A Rya PCJ table name built using the provided values.
+ */
+ public String makeTableName(final String tablePrefix) {
+ final String uniqueId = UUID.randomUUID().toString().replaceAll("-", "");
+ return makeTableName(tablePrefix, uniqueId);
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/14073a23/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjTables.java
----------------------------------------------------------------------
diff --git a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjTables.java b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjTables.java
new file mode 100644
index 0000000..db74996
--- /dev/null
+++ b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjTables.java
@@ -0,0 +1,653 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.storage.accumulo;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import javax.annotation.ParametersAreNonnullByDefault;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.ConditionalWriter;
+import org.apache.accumulo.core.client.ConditionalWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.MutationsRejectedException;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableExistsException;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.admin.TableOperations;
+import org.apache.accumulo.core.client.lexicoder.ListLexicoder;
+import org.apache.accumulo.core.client.lexicoder.LongLexicoder;
+import org.apache.accumulo.core.client.lexicoder.StringLexicoder;
+import org.apache.accumulo.core.data.Condition;
+import org.apache.accumulo.core.data.ConditionalMutation;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.security.ColumnVisibility;
+import org.apache.hadoop.io.Text;
+import org.apache.log4j.Logger;
+import org.apache.rya.indexing.pcj.storage.PcjMetadata;
+import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.PCJStorageException;
+import org.apache.rya.indexing.pcj.storage.accumulo.BindingSetConverter.BindingSetConversionException;
+import org.openrdf.query.BindingSet;
+import org.openrdf.query.MalformedQueryException;
+import org.openrdf.query.QueryEvaluationException;
+import org.openrdf.query.QueryLanguage;
+import org.openrdf.query.TupleQuery;
+import org.openrdf.query.TupleQueryResult;
+import org.openrdf.repository.RepositoryConnection;
+import org.openrdf.repository.RepositoryException;
+
+import com.google.common.base.Optional;
+
+/**
+ * Functions that create and maintain the PCJ tables that are used by Rya.
+ */
+@ParametersAreNonnullByDefault
+public class PcjTables {
+ private static final Logger log = Logger.getLogger(PcjTables.class);
+
+ /**
+ * The Row ID of all {@link PcjMetadata} entries that are stored in Accumulo.
+ */
+ private static final Text PCJ_METADATA_ROW_ID = new Text("pcjMetadata");
+
+ /**
+ * The Column Family for all PCJ metadata entries.
+ */
+ private static final Text PCJ_METADATA_FAMILY = new Text("metadata");
+
+ /**
+ * The Column Qualifier for the SPARQL query a PCJ is built from.
+ */
+ private static final Text PCJ_METADATA_SPARQL_QUERY = new Text("sparql");
+
+ /**
+ * The Column Qualifier for the cardinality of a PCJ.
+ */
+ private static final Text PCJ_METADATA_CARDINALITY = new Text("cardinality");
+
+ /**
+ * The Column Qualifier for the various variable orders a PCJ's results are written to.
+ */
+ private static final Text PCJ_METADATA_VARIABLE_ORDERS = new Text("variableOrders");
+
+ // Lexicoders used to read/write PcjMetadata to/from Accumulo.
+ private static final LongLexicoder longLexicoder = new LongLexicoder();
+ private static final StringLexicoder stringLexicoder = new StringLexicoder();
+ private static final ListLexicoder<String> listLexicoder = new ListLexicoder<>(stringLexicoder);
+
+ /**
+ * Create a new PCJ table within an Accumulo instance for a SPARQL query.
+ * For example, calling the function like this:
+ * <pre>
+ * PcjTables.createPcjTable(
+ * accumuloConn,
+ *
+ * "foo_INDEX_query1234",
+ *
+ * Sets.newHashSet(
+ * new VariableOrder("city;worker;customer"),
+ * new VariableOrder("worker;customer;city") ,
+ * new VariableOrder("customer;city;worker")),
+ *
+ * "SELECT ?customer ?worker ?city { " +
+ * "?customer <http://talksTo> ?worker. " +
+ * "?worker <http://livesIn> ?city. " +
+ * "?worker <http://worksAt> <http://Home>. " +
+ * "}");
+ * </pre>
+ * </p>
+ * Will result in an Accumulo table named "foo_INDEX_query1234" with the following entries:
+ * <table border="1" style="width:100%">
+ * <tr> <th>Row ID</td> <th>Column</td> <th>Value</td> </tr>
+ * <tr> <td>pcjMetadata</td> <td>metadata:sparql</td> <td> ... UTF-8 bytes encoding the query string ... </td> </tr>
+ * <tr> <td>pcjMetadata</td> <td>metadata:cardinality</td> <td> The query's cardinality </td> </tr>
+ * <tr> <td>pcjMetadata</td> <td>metadata:variableOrders</td> <td> The variable orders the results are written to </td> </tr>
+ * </table>
+ *
+ * @param accumuloConn - A connection to the Accumulo that hosts the PCJ table. (not null)
+ * @param pcjTableName - The name of the table that will be created. (not null)
+ * @param varOrders - The variable orders the results within the table will be written to. (not null)
+ * @param sparql - The query this table's results solves. (not null)
+ * @throws PCJStorageException Could not create a new PCJ table either because Accumulo
+ * would not let us create it or the PCJ metadata was not able to be written to it.
+ */
+ public void createPcjTable(
+ final Connector accumuloConn,
+ final String pcjTableName,
+ final Set<VariableOrder> varOrders,
+ final String sparql) throws PCJStorageException {
+ checkNotNull(accumuloConn);
+ checkNotNull(pcjTableName);
+ checkNotNull(varOrders);
+ checkNotNull(sparql);
+
+ final TableOperations tableOps = accumuloConn.tableOperations();
+ if(!tableOps.exists(pcjTableName)) {
+ try {
+ // Create the new table in Accumulo.
+ tableOps.create(pcjTableName);
+
+ // Write the PCJ Metadata to the newly created table.
+ final PcjMetadata pcjMetadata = new PcjMetadata(sparql, 0L, varOrders);
+ final List<Mutation> mutations = makeWriteMetadataMutations(pcjMetadata);
+
+ final BatchWriter writer = accumuloConn.createBatchWriter(pcjTableName, new BatchWriterConfig());
+ writer.addMutations(mutations);
+ writer.close();
+ } catch (final TableExistsException e) {
+ log.warn("Something else just created the Rya PCJ export table named '" + pcjTableName
+ + "'. This is unexpected, but we will continue as normal.");
+ } catch (AccumuloException | AccumuloSecurityException | TableNotFoundException e) {
+ throw new PCJStorageException("Could not create a new PCJ named: " + pcjTableName, e);
+ }
+ }
+ }
+
+ /**
+ * Create the {@link Mutation}s required to write a {@link PCJMetadata} object
+ * to an Accumulo table.
+ *
+ * @param metadata - The metadata to write. (not null)
+ * @return An ordered list of mutations that write the metadata to an Accumulo table.
+ */
+ private static List<Mutation> makeWriteMetadataMutations(final PcjMetadata metadata) {
+ checkNotNull(metadata);
+
+ final List<Mutation> mutations = new LinkedList<>();
+
+ // SPARQL Query
+ Mutation mutation = new Mutation(PCJ_METADATA_ROW_ID);
+ final Value query = new Value( stringLexicoder.encode(metadata.getSparql()) );
+ mutation.put(PCJ_METADATA_FAMILY, PCJ_METADATA_SPARQL_QUERY, query);
+ mutations.add(mutation);
+
+ // Cardinality
+ mutation = new Mutation(PCJ_METADATA_ROW_ID);
+ final Value cardinality = new Value( longLexicoder.encode(new Long(metadata.getCardinality())) );
+ mutation.put(PCJ_METADATA_FAMILY, PCJ_METADATA_CARDINALITY, cardinality);
+ mutations.add(mutation);
+
+ // Variable Orders
+ final List<String> varOrderStrings = new ArrayList<>();
+ for(final VariableOrder varOrder : metadata.getVarOrders()) {
+ varOrderStrings.add( varOrder.toString() );
+ }
+
+ mutation = new Mutation(PCJ_METADATA_ROW_ID);
+ final Value variableOrders = new Value( listLexicoder.encode(varOrderStrings) );
+ mutation.put(PCJ_METADATA_FAMILY, PCJ_METADATA_VARIABLE_ORDERS, variableOrders);
+ mutations.add(mutation);
+
+ return mutations;
+ }
+
+ /**
+ * Fetch the {@link PCJMetadata} from an Accumulo table.
+ * <p>
+ * This method assumes the PCJ table has already been created.
+ *
+ * @param accumuloConn - A connection to the Accumulo that hosts the PCJ table. (not null)
+ * @param pcjTableName - The name of the table that will be search. (not null)
+ * @return The PCJ Metadata that has been stolred in the in the PCJ Table.
+ * @throws PCJStorageException The PCJ Table does not exist.
+ */
+ public PcjMetadata getPcjMetadata(
+ final Connector accumuloConn,
+ final String pcjTableName) throws PCJStorageException {
+ checkNotNull(accumuloConn);
+ checkNotNull(pcjTableName);
+
+ try {
+ // Create an Accumulo scanner that iterates through the metadata entries.
+ final Scanner scanner = accumuloConn.createScanner(pcjTableName, new Authorizations());
+ final Iterator<Entry<Key, Value>> entries = scanner.iterator();
+
+ // No metadata has been stored in the table yet.
+ if(!entries.hasNext()) {
+ throw new PCJStorageException("Could not find any PCJ metadata in the table named: " + pcjTableName);
+ }
+
+ // Fetch the metadata from the entries. Assuming they all have the same cardinality and sparql query.
+ String sparql = null;
+ Long cardinality = null;
+ final Set<VariableOrder> varOrders = new HashSet<>();
+
+ while(entries.hasNext()) {
+ final Entry<Key, Value> entry = entries.next();
+ final Text columnQualifier = entry.getKey().getColumnQualifier();
+ final byte[] value = entry.getValue().get();
+
+ if(columnQualifier.equals(PCJ_METADATA_SPARQL_QUERY)) {
+ sparql = stringLexicoder.decode(value);
+ } else if(columnQualifier.equals(PCJ_METADATA_CARDINALITY)) {
+ cardinality = longLexicoder.decode(value);
+ } else if(columnQualifier.equals(PCJ_METADATA_VARIABLE_ORDERS)) {
+ for(final String varOrderStr : listLexicoder.decode(value)) {
+ varOrders.add( new VariableOrder(varOrderStr) );
+ }
+ }
+ }
+
+ return new PcjMetadata(sparql, cardinality, varOrders);
+
+ } catch (final TableNotFoundException e) {
+ throw new PCJStorageException("Could not add results to a PCJ because the PCJ table does not exist.", e);
+ }
+ }
+
+ /**
+ * Add a collection of results to a PCJ table. The table's cardinality will
+ * be updated to include the new results.
+ * <p>
+ * This method assumes the PCJ table has already been created.
+ *
+ * @param accumuloConn - A connection to the Accumulo that hosts the PCJ table. (not null)
+ * @param pcjTableName - The name of the PCJ table that will receive the results. (not null)
+ * @param results - Binding sets that will be written to the PCJ table. (not null)
+ * @throws PCJStorageException The provided PCJ table doesn't exist, is missing the
+ * PCJ metadata, or the result could not be written to it.
+ */
+ public void addResults(
+ final Connector accumuloConn,
+ final String pcjTableName,
+ final Collection<VisibilityBindingSet> results) throws PCJStorageException {
+ checkNotNull(accumuloConn);
+ checkNotNull(pcjTableName);
+ checkNotNull(results);
+
+ // Write a result to each of the variable orders that are in the table.
+ writeResults(accumuloConn, pcjTableName, results);
+
+ // Increment the cardinality of the query by the number of new results.
+ updateCardinality(accumuloConn, pcjTableName, results.size());
+ }
+
+ /**
+ * Add a collection of results to a specific PCJ table.
+ *
+ * @param accumuloConn - A connection to the Accumulo that hosts the PCJ table. (not null)
+ * @param pcjTableName - The name of the PCJ table that will receive the results. (not null)
+ * @param results - Binding sets that will be written to the PCJ table. (not null)
+ * @throws PCJStorageException The provided PCJ table doesn't exist, is missing the
+ * PCJ metadata, or the result could not be written to it.
+ */
+ private void writeResults(
+ final Connector accumuloConn,
+ final String pcjTableName,
+ final Collection<VisibilityBindingSet> results) throws PCJStorageException {
+ checkNotNull(accumuloConn);
+ checkNotNull(pcjTableName);
+ checkNotNull(results);
+
+ // Fetch the variable orders from the PCJ table.
+ final PcjMetadata metadata = getPcjMetadata(accumuloConn, pcjTableName);
+
+ // Write each result formatted using each of the variable orders.
+ BatchWriter writer = null;
+ try {
+ writer = accumuloConn.createBatchWriter(pcjTableName, new BatchWriterConfig());
+ for(final VisibilityBindingSet result : results) {
+ final Set<Mutation> addResultMutations = makeWriteResultMutations(metadata.getVarOrders(), result);
+ writer.addMutations( addResultMutations );
+ }
+ } catch (TableNotFoundException | MutationsRejectedException e) {
+ throw new PCJStorageException("Could not add results to the PCJ table named: " + pcjTableName, e);
+ } finally {
+ if(writer != null) {
+ try {
+ writer.close();
+ } catch (final MutationsRejectedException e) {
+ throw new PCJStorageException("Could not add results to a PCJ table because some of the mutations were rejected.", e);
+ }
+ }
+ }
+ }
+
+ /**
+ * Create the {@link Mutations} required to write a new {@link BindingSet}
+ * to a PCJ table for each {@link VariableOrder} that is provided.
+ *
+ * @param varOrders - The variables orders the result will be written to. (not null)
+ * @param result - A new PCJ result. (not null)
+ * @return Mutation that will write the result to a PCJ table.
+ * @throws PCJStorageException The binding set could not be encoded.
+ */
+ private static Set<Mutation> makeWriteResultMutations(
+ final Set<VariableOrder> varOrders,
+ final VisibilityBindingSet result) throws PCJStorageException {
+ checkNotNull(varOrders);
+ checkNotNull(result);
+
+ final Set<Mutation> mutations = new HashSet<>();
+ final AccumuloPcjSerializer converter = new AccumuloPcjSerializer();
+
+ for(final VariableOrder varOrder : varOrders) {
+ try {
+ // Serialize the result to the variable order.
+ final byte[] serializedResult = converter.convert(result, varOrder);
+
+ // Row ID = binding set values, Column Family = variable order of the binding set.
+ final Mutation addResult = new Mutation(serializedResult);
+ final String visibility = result.getVisibility();
+ addResult.put(varOrder.toString(), "", new ColumnVisibility(visibility), "");
+ mutations.add(addResult);
+ } catch(final BindingSetConversionException e) {
+ throw new PCJStorageException("Could not serialize a result.", e);
+ }
+ }
+
+ return mutations;
+ }
+
+ /**
+ * Update the cardinality of a PCJ by a {@code delta}.
+ *
+ * @param accumuloConn - A connection to the Accumulo that hosts the PCJ table. (not null)
+ * @param pcjTableName - The name of the PCJ table that will have its cardinality updated. (not null)
+ * @param delta - How much the cardinality will change.
+ * @throws PCJStorageException The cardinality could not be updated.
+ */
+ private void updateCardinality(
+ final Connector accumuloConn,
+ final String pcjTableName,
+ final long delta) throws PCJStorageException {
+ checkNotNull(accumuloConn);
+ checkNotNull(pcjTableName);
+
+ ConditionalWriter conditionalWriter = null;
+ try {
+ conditionalWriter = accumuloConn.createConditionalWriter(pcjTableName, new ConditionalWriterConfig());
+
+ boolean updated = false;
+ while(!updated) {
+ // Write the conditional update request to Accumulo.
+ final long cardinality = getPcjMetadata(accumuloConn, pcjTableName).getCardinality();
+ final ConditionalMutation mutation = makeUpdateCardinalityMutation(cardinality, delta);
+ final ConditionalWriter.Result result = conditionalWriter.write(mutation);
+
+ // Interpret the result.
+ switch(result.getStatus()) {
+ case ACCEPTED:
+ updated = true;
+ break;
+ case REJECTED:
+ break;
+ case UNKNOWN:
+ // We do not know if the mutation succeeded. At best, we can hope the metadata hasn't been updated
+ // since we originally fetched it and try again. Otherwise, continue forwards as if it worked. It's
+ // okay if this number is slightly off.
+ final long newCardinality = getPcjMetadata(accumuloConn, pcjTableName).getCardinality();
+ if(newCardinality != cardinality) {
+ updated = true;
+ }
+ break;
+ case VIOLATED:
+ throw new PCJStorageException("The cardinality could not be updated because the commit violated a table constraint.");
+ case INVISIBLE_VISIBILITY:
+ throw new PCJStorageException("The condition contains a visibility the updater can not satisfy.");
+ }
+ }
+ } catch (AccumuloException | AccumuloSecurityException | TableNotFoundException e) {
+ throw new PCJStorageException("Could not update the cardinality value of the PCJ Table named: " + pcjTableName, e);
+ } finally {
+ if(conditionalWriter != null) {
+ conditionalWriter.close();
+ }
+ }
+ }
+
+ /**
+ * Creates a {@link ConditionalMutation} that only updates the cardinality
+ * of the PCJ table if the old value has not changed by the time this mutation
+ * is committed to Accumulo.
+ *
+ * @param current - The current cardinality value.
+ * @param delta - How much the cardinality will change.
+ * @return The mutation that will perform the conditional update.
+ */
+ private static ConditionalMutation makeUpdateCardinalityMutation(final long current, final long delta) {
+ // Try to update the cardinality by the delta.
+ final ConditionalMutation mutation = new ConditionalMutation(PCJ_METADATA_ROW_ID);
+ final Condition lastCardinalityStillCurrent = new Condition(
+ PCJ_METADATA_FAMILY,
+ PCJ_METADATA_CARDINALITY);
+
+ // Require the old cardinality to be the value we just read.
+ final byte[] currentCardinalityBytes = longLexicoder.encode( current );
+ lastCardinalityStillCurrent.setValue( currentCardinalityBytes );
+ mutation.addCondition(lastCardinalityStillCurrent);
+
+ // If that is the case, then update to the new value.
+ final Value newCardinality = new Value( longLexicoder.encode(current + delta) );
+ mutation.put(PCJ_METADATA_FAMILY, PCJ_METADATA_CARDINALITY, newCardinality);
+ return mutation;
+ }
+
+ /**
+ * Scan Rya for results that solve the PCJ's query and store them in the PCJ table.
+ * <p>
+ * This method assumes the PCJ table has already been created.
+ *
+ * @param accumuloConn - A connection to the Accumulo that hosts the PCJ table. (not null)
+ * @param pcjTableName - The name of the PCJ table that will receive the results. (not null)
+ * @param ryaConn - A connection to the Rya store that will be queried to find results. (not null)
+ * @throws PCJStorageException If results could not be written to the PCJ table,
+ * the PCJ table does not exist, or the query that is being execute
+ * was malformed.
+ */
+ public void populatePcj(
+ final Connector accumuloConn,
+ final String pcjTableName,
+ final RepositoryConnection ryaConn) throws PCJStorageException {
+ checkNotNull(accumuloConn);
+ checkNotNull(pcjTableName);
+ checkNotNull(ryaConn);
+
+ try {
+ // Fetch the query that needs to be executed from the PCJ table.
+ final PcjMetadata pcjMetadata = getPcjMetadata(accumuloConn, pcjTableName);
+ final String sparql = pcjMetadata.getSparql();
+
+ // Query Rya for results to the SPARQL query.
+ final TupleQuery query = ryaConn.prepareTupleQuery(QueryLanguage.SPARQL, sparql);
+ final TupleQueryResult results = query.evaluate();
+
+ // Load batches of 1000 of them at a time into the PCJ table
+ final Set<VisibilityBindingSet> batch = new HashSet<>(1000);
+ while(results.hasNext()) {
+ batch.add( new VisibilityBindingSet(results.next()) );
+
+ if(batch.size() == 1000) {
+ addResults(accumuloConn, pcjTableName, batch);
+ batch.clear();
+ }
+ }
+
+ if(!batch.isEmpty()) {
+ addResults(accumuloConn, pcjTableName, batch);
+ }
+
+ } catch (RepositoryException | MalformedQueryException | QueryEvaluationException e) {
+ throw new PCJStorageException("Could not populate a PCJ table with Rya results for the table named: " + pcjTableName, e);
+ }
+ }
+
+ private static final PcjVarOrderFactory DEFAULT_VAR_ORDER_FACTORY = new ShiftVarOrderFactory();
+
+ /**
+ * Creates a new PCJ Table in Accumulo and populates it by scanning an
+ * instance of Rya for historic matches.
+ * <p>
+ * If any portion of this operation fails along the way, the partially
+ * create PCJ table will be left in Accumulo.
+ *
+ * @param ryaConn - Connects to the Rya that will be scanned. (not null)
+ * @param accumuloConn - Connects to the accumulo that hosts the PCJ results. (not null)
+ * @param pcjTableName - The name of the PCJ table that will be created. (not null)
+ * @param sparql - The SPARQL query whose results will be loaded into the table. (not null)
+ * @param resultVariables - The variables that are included in the query's resulting binding sets. (not null)
+ * @param pcjVarOrderFactory - An optional factory that indicates the various variable orders
+ * the results will be stored in. If one is not provided, then {@link ShiftVarOrderFactory}
+ * is used by default. (not null)
+ * @throws PCJStorageException The PCJ table could not be create or the values from
+ * Rya were not able to be loaded into it.
+ */
+ public void createAndPopulatePcj(
+ final RepositoryConnection ryaConn,
+ final Connector accumuloConn,
+ final String pcjTableName,
+ final String sparql,
+ final String[] resultVariables,
+ final Optional<PcjVarOrderFactory> pcjVarOrderFactory) throws PCJStorageException {
+ checkNotNull(ryaConn);
+ checkNotNull(accumuloConn);
+ checkNotNull(pcjTableName);
+ checkNotNull(sparql);
+ checkNotNull(resultVariables);
+ checkNotNull(pcjVarOrderFactory);
+
+ // Create the PCJ's variable orders.
+ final PcjVarOrderFactory varOrderFactory = pcjVarOrderFactory.or(DEFAULT_VAR_ORDER_FACTORY);
+ final Set<VariableOrder> varOrders = varOrderFactory.makeVarOrders( new VariableOrder(resultVariables) );
+
+ // Create the PCJ table in Accumulo.
+ createPcjTable(accumuloConn, pcjTableName, varOrders, sparql);
+
+ // Load historic matches from Rya into the PCJ table.
+ populatePcj(accumuloConn, pcjTableName, ryaConn);
+ }
+
+ /**
+ * List the table names of the PCJ index tables that are stored in Accumulo
+ * for a specific instance of Rya.
+ *
+ * @param accumuloConn - Connects to the accumulo that hosts the PCJ indices. (not null)
+ * @param ryaInstanceName - The name of the Rya instance. (not null)
+ * @return A list of Accumulo table names that hold PCJ index data for a
+ * specific Rya instance.
+ */
+ public List<String> listPcjTables(final Connector accumuloConn, final String ryaInstanceName) {
+ checkNotNull(accumuloConn);
+ checkNotNull(ryaInstanceName);
+
+ final List<String> pcjTables = new ArrayList<>();
+
+ final String pcjPrefix = ryaInstanceName + "INDEX";
+ boolean foundInstance = false;
+
+ for(final String tableName : accumuloConn.tableOperations().list()) {
+ if(tableName.startsWith(ryaInstanceName)) {
+ // This table is part of the target Rya instance.
+ foundInstance = true;
+
+ if(tableName.startsWith(pcjPrefix)) {
+ pcjTables.add(tableName);
+ }
+ }
+
+ else if(foundInstance) {
+ // We have encountered the first table name that does not start
+ // with the rya instance name after those that do. Because the
+ // list is sorted, there can't be any more pcj tables for the
+ // target instance in the list.
+ break;
+ }
+ }
+
+ return pcjTables;
+ }
+
+ /**
+ * Deletes all of the rows that are in a PCJ index and sets its cardinality back to 0.
+ *
+ * @param accumuloConn - Connects to the Accumulo that hosts the PCJ indices. (not null)
+ * @param pcjTableName - The name of the PCJ table that will be purged. (not null)
+ * @throws PCJStorageException Either the rows could not be dropped from the
+ * PCJ table or the metadata could not be written back to the table.
+ */
+ public void purgePcjTable(final Connector accumuloConn, final String pcjTableName) throws PCJStorageException {
+ checkNotNull(accumuloConn);
+ checkNotNull(pcjTableName);
+
+ // Fetch the metadaata from the PCJ table.
+ final PcjMetadata oldMetadata = getPcjMetadata(accumuloConn, pcjTableName);
+
+ // Delete all of the rows
+ try {
+ accumuloConn.tableOperations().deleteRows(pcjTableName, null, null);
+ } catch (AccumuloException | AccumuloSecurityException | TableNotFoundException e) {
+ throw new PCJStorageException("Could not delete the rows of data from PCJ table named: " + pcjTableName, e);
+ }
+
+ // Store the new metadata.
+ final PcjMetadata newMetadata = new PcjMetadata(oldMetadata.getSparql(), 0L, oldMetadata.getVarOrders());
+ final List<Mutation> mutations = makeWriteMetadataMutations(newMetadata);
+
+ BatchWriter writer = null;
+ try {
+ writer = accumuloConn.createBatchWriter(pcjTableName, new BatchWriterConfig());
+ writer.addMutations(mutations);
+ writer.flush();
+ } catch (final TableNotFoundException | MutationsRejectedException e) {
+ throw new PCJStorageException("Could not rewrite the PCJ cardinality for table named '"
+ + pcjTableName + "'. This table will not work anymore.", e);
+ } finally {
+ if(writer != null) {
+ try {
+ writer.close();
+ } catch (final MutationsRejectedException e) {
+ throw new PCJStorageException("Could not close the batch writer.", e);
+ }
+ }
+ }
+ }
+
+ /**
+ * Drops a PCJ index from Accumulo.
+ *
+ * @param accumuloConn - Connects to the Accumulo that hosts the PCJ indices. (not null)
+ * @param pcjTableName - The name of the PCJ table that will be dropped. (not null)
+ * @throws PCJStorageException - The table could not be dropped because of
+ * a security exception or because it does not exist.
+ */
+ public void dropPcjTable(final Connector accumuloConn, final String pcjTableName) throws PCJStorageException {
+ checkNotNull(accumuloConn);
+ checkNotNull(pcjTableName);
+ try {
+ accumuloConn.tableOperations().delete(pcjTableName);
+ } catch (AccumuloException | AccumuloSecurityException | TableNotFoundException e) {
+ throw new PCJStorageException("Could not delete PCJ table named: " + pcjTableName, e);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/14073a23/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjVarOrderFactory.java
----------------------------------------------------------------------
diff --git a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjVarOrderFactory.java b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjVarOrderFactory.java
new file mode 100644
index 0000000..f806d6e
--- /dev/null
+++ b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjVarOrderFactory.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.storage.accumulo;
+
+import java.util.Set;
+
+/**
+ * Create alternative variable orders for a SPARQL query based on
+ * the original ordering of its results.
+ */
+public interface PcjVarOrderFactory {
+
+ /**
+ * Create alternative variable orders for a SPARQL query based on
+ * the original ordering of its results.
+ *
+ * @param varOrder - The initial variable order of a SPARQL query. (not null)
+ * @return A set of alternative variable orders for the original.
+ */
+ public Set<VariableOrder> makeVarOrders(VariableOrder varOrder);
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/14073a23/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/ShiftVarOrderFactory.java
----------------------------------------------------------------------
diff --git a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/ShiftVarOrderFactory.java b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/ShiftVarOrderFactory.java
new file mode 100644
index 0000000..1ae21e5
--- /dev/null
+++ b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/ShiftVarOrderFactory.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.storage.accumulo;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import javax.annotation.ParametersAreNonnullByDefault;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Shifts the variables to the left so that each variable will appear at
+ * the head of the varOrder once.
+ */
+@ParametersAreNonnullByDefault
+public class ShiftVarOrderFactory implements PcjVarOrderFactory {
+ @Override
+ public Set<VariableOrder> makeVarOrders(final VariableOrder varOrder) {
+ final Set<VariableOrder> varOrders = new HashSet<>();
+
+ final List<String> cyclicBuff = Lists.newArrayList( varOrder.getVariableOrders() );
+ final String[] varOrderBuff = new String[ cyclicBuff.size() ];
+
+ for(int shift = 0; shift < cyclicBuff.size(); shift++) {
+ // Build a variable order.
+ for(int i = 0; i < cyclicBuff.size(); i++) {
+ varOrderBuff[i] = cyclicBuff.get(i);
+ }
+ varOrders.add( new VariableOrder(varOrderBuff) );
+
+ // Shift the order the variables will appear in the cyclic buffer.
+ cyclicBuff.add( cyclicBuff.remove(0) );
+ }
+
+ return varOrders;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/14073a23/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/VariableOrder.java
----------------------------------------------------------------------
diff --git a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/VariableOrder.java b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/VariableOrder.java
new file mode 100644
index 0000000..ef88d8c
--- /dev/null
+++ b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/VariableOrder.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.storage.accumulo;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.Collection;
+import java.util.Iterator;
+
+import javax.annotation.ParametersAreNonnullByDefault;
+import javax.annotation.concurrent.Immutable;
+
+import org.openrdf.query.BindingSet;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.ImmutableList;
+
+/**
+ * An ordered list of {@link BindingSet} variable names. These are used to
+ * specify the order {@link Binding}s within the set are serialized to Accumulo.
+ * This order effects which rows a prefix scan will hit.
+ */
+@Immutable
+@ParametersAreNonnullByDefault
+public final class VariableOrder implements Iterable<String> {
+
+ public static final String VAR_ORDER_DELIM = ";";
+
+ private final ImmutableList<String> variableOrder;
+
+ /**
+ * Constructs an instance of {@link VariableOrder}.
+ *
+ * @param varOrder - An ordered array of Binding Set variables. (not null)
+ */
+ public VariableOrder(final String... varOrder) {
+ checkNotNull(varOrder);
+ variableOrder = ImmutableList.copyOf(varOrder);
+ }
+
+ /**
+ * Constructs an instance of {@link VariableOrdeR{.
+ *
+ * @param varOrder - An ordered collection of Binding Set variables. (not null)
+ */
+ public VariableOrder(final Collection<String> varOrder) {
+ checkNotNull(varOrder);
+ variableOrder = ImmutableList.copyOf(varOrder);
+ }
+
+ /**
+ * Constructs an instance of {@link VariableOrder}.
+ *
+ * @param varOrderString - The String representation of a VariableOrder. (not null)
+ */
+ public VariableOrder(final String varOrderString) {
+ checkNotNull(varOrderString);
+ variableOrder = ImmutableList.copyOf( varOrderString.split(VAR_ORDER_DELIM) );
+ }
+
+ /**
+ * @return And ordered list of Binding Set variables.
+ */
+ public ImmutableList<String> getVariableOrders() {
+ return variableOrder;
+ }
+
+ /**
+ * @return The variable order as an ordered array of Strings. This array is mutable.
+ */
+ public String[] toArray() {
+ final String[] array = new String[ variableOrder.size() ];
+ return variableOrder.toArray( array );
+ }
+
+ @Override
+ public String toString() {
+ return Joiner.on(VAR_ORDER_DELIM).join(variableOrder);
+ }
+
+ @Override
+ public int hashCode() {
+ return variableOrder.hashCode();
+ }
+
+ @Override
+ public boolean equals(final Object o) {
+ if(this == o) {
+ return true;
+ } else if(o instanceof VariableOrder) {
+ final VariableOrder varOrder = (VariableOrder) o;
+ return variableOrder.equals( varOrder.variableOrder );
+ }
+ return false;
+ }
+
+ @Override
+ public Iterator<String> iterator() {
+ return variableOrder.iterator();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/14073a23/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/VisibilityBindingSet.java
----------------------------------------------------------------------
diff --git a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/VisibilityBindingSet.java b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/VisibilityBindingSet.java
new file mode 100644
index 0000000..b9f4a1f
--- /dev/null
+++ b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/VisibilityBindingSet.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.storage.accumulo;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import javax.annotation.ParametersAreNonnullByDefault;
+
+import org.openrdf.query.BindingSet;
+
+/**
+ * Decorates a {@link BindingSet} with a collection of visibilities.
+ */
+@ParametersAreNonnullByDefault
+public class VisibilityBindingSet extends BindingSetDecorator {
+ private static final long serialVersionUID = 1L;
+ private final String visibility;
+ private volatile int hashCode;
+
+ /**
+ * @param set - Decorates the {@link BindingSet} with no visibilities.
+ */
+ public VisibilityBindingSet(final BindingSet set) {
+ this(set, "");
+ }
+
+ /**
+ * Creates a new {@link VisibilityBindingSet}
+ * @param set - The {@link BindingSet} to decorate
+ * @param visibility - The visibilities on the {@link BindingSet} (not null)
+ */
+ public VisibilityBindingSet(final BindingSet set, final String visibility) {
+ super(set);
+ this.visibility = checkNotNull(visibility);
+ }
+
+ /**
+ * @return - The Visibilities on the {@link BindingSet}
+ */
+ public String getVisibility() {
+ return visibility;
+ }
+
+ @Override
+ public boolean equals(final Object o) {
+ if (this == o) {
+ return true;
+ } else if(o instanceof VisibilityBindingSet) {
+ final VisibilityBindingSet other = (VisibilityBindingSet) o;
+ return set.equals(other) && visibility.equals(other.getVisibility());
+ }
+ return false;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = hashCode;
+ if(result == 0) {
+ result = 31 * result + visibility.hashCode();
+ result = 31 * result + super.hashCode();
+ hashCode = result;
+ }
+ return result;
+ }
+
+ @Override
+ public String toString() {
+ final StringBuilder sb = new StringBuilder(super.toString());
+ sb.append("\n Visibility: " + getVisibility() + "\n");
+ return sb.toString();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/14073a23/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/VisibilityBindingSetStringConverter.java
----------------------------------------------------------------------
diff --git a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/VisibilityBindingSetStringConverter.java b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/VisibilityBindingSetStringConverter.java
new file mode 100644
index 0000000..8ff01ac
--- /dev/null
+++ b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/VisibilityBindingSetStringConverter.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.storage.accumulo;
+
+import javax.annotation.ParametersAreNonnullByDefault;
+
+import org.openrdf.query.BindingSet;
+
+import com.google.common.base.Strings;
+
+/**
+ * Converts {@link BindingSet}s to Strings and back again. The Strings do not
+ * include the binding names and are ordered with a {@link VariableOrder}.
+ */
+@ParametersAreNonnullByDefault
+public class VisibilityBindingSetStringConverter extends BindingSetStringConverter {
+ public static final char VISIBILITY_DELIM = 1;
+
+ @Override
+ public String convert(final BindingSet bindingSet, final VariableOrder varOrder) {
+ String visibility = "";
+ if(bindingSet instanceof VisibilityBindingSet) {
+ final VisibilityBindingSet visiSet = (VisibilityBindingSet) bindingSet;
+ if(!Strings.isNullOrEmpty(visiSet.getVisibility())) {
+ visibility = VISIBILITY_DELIM + visiSet.getVisibility();
+ }
+ }
+ return super.convert(bindingSet, varOrder) + visibility;
+ }
+
+ @Override
+ public BindingSet convert(final String bindingSetString, final VariableOrder varOrder) {
+ final String[] visiStrings = bindingSetString.split("" + VISIBILITY_DELIM);
+ BindingSet bindingSet = super.convert(visiStrings[0], varOrder);
+
+ if(visiStrings.length > 1) {
+ bindingSet = new VisibilityBindingSet(bindingSet, visiStrings[1]);
+ } else {
+ bindingSet = new VisibilityBindingSet(bindingSet);
+ }
+ return bindingSet;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/14073a23/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/update/PrecomputedJoinUpdater.java
----------------------------------------------------------------------
diff --git a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/update/PrecomputedJoinUpdater.java b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/update/PrecomputedJoinUpdater.java
new file mode 100644
index 0000000..5cf01c5
--- /dev/null
+++ b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/update/PrecomputedJoinUpdater.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.update;
+
+import java.util.Collection;
+
+import javax.annotation.ParametersAreNonnullByDefault;
+
+import org.apache.rya.indexing.pcj.storage.PcjException;
+
+import mvm.rya.api.domain.RyaStatement;
+
+/**
+ * Updates the state of all PCJ indices whenever {@link RyaStatement}s are
+ * added to or removed from the system.
+ */
+@ParametersAreNonnullByDefault
+public interface PrecomputedJoinUpdater {
+
+ /**
+ * The PCJ indices will be updated to include new statements within
+ * their results.
+ *
+ * @param statements - The statements that will be used to updated the index. (not null)
+ * @throws PcjUpdateException The statements could not be added to the index.
+ */
+ public void addStatements(final Collection<RyaStatement> statements) throws PcjUpdateException;
+
+ /**
+ * The PCJ indices will be updated to remove any results that are
+ * derived from the provided statements.
+ * </p>
+ * A result will only be deleted from the index if all statements
+ * it is derived from are removed. For example, suppose the following
+ * instructions execute:
+ * <pre>
+ * Insert Statement A
+ * Insert Statement B
+ * Insert Statement C
+ * A and B Join to create Result A
+ * B and C Join to create Result A again
+ * Delete Statement A
+ * </pre>
+ * Result A will remain in the index because B and C have not been
+ * delete. However, If either B or C are deleted, then the result will
+ * also be deleted because it can no longer be derived from the remaining
+ * information.
+ *
+ * @param statements - The statements that will be used to updated the index. (not null)
+ * @throws PcjUpdateException The statements could not be removed from the index.
+ */
+ public void deleteStatements(Collection<RyaStatement> statements) throws PcjUpdateException;
+
+ /**
+ * If the updater does any batching, then this will force it to flush immediately.
+ *
+ * @throws PcjUpdateException The updater could not be flushed.
+ */
+ public void flush() throws PcjUpdateException;
+
+ /**
+ * Cleans up any resources required to perform the updates (sockets, streams, etc).
+ *
+ * @throws PcjUpdateException The updater could not be closed.
+ */
+ public void close() throws PcjUpdateException;
+
+ /**
+ * An operation of {@link PrecomputedJoinUpdater} failed.
+ */
+ public static class PcjUpdateException extends PcjException {
+ private static final long serialVersionUID = 1L;
+
+ /**
+ * Constructs a new exception with the specified detail message. The cause
+ * is not initialized, and may subsequently be initialized by a call to
+ * {@link Throwable#initCause(java.lang.Throwable)}.
+ *
+ * @param message - The detail message. The detail message is saved for
+ * later retrieval by the {@link Throwable#getMessage()} method.
+ */
+ public PcjUpdateException(final String message) {
+ super(message);
+ }
+
+ /**
+ * Constructs a new exception with the specified detail message and cause.
+ * </p>
+ * Note that the detail message associated with cause is not automatically
+ * incorporated in this exception's detail message.
+ *
+ * @param message - The detail message (which is saved for later retrieval
+ * by the {@link Throwable#getMessage()} method).
+ * @param cause - The cause (which is saved for later retrieval by the
+ * {@link Throwable#getCause()} method). (A null value is permitted, and
+ * indicates that the cause is nonexistent or unknown.)
+ */
+ public PcjUpdateException(final String message, final Throwable cause) {
+ super(message, cause);
+ }
+ }
+}
\ No newline at end of file