You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rya.apache.org by pu...@apache.org on 2016/05/13 16:42:45 UTC

[3/6] incubator-rya git commit: RYA-64 - Integrated Rya PCJ Secondary Index support into core Rya.

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/14073a23/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/AccumuloPcjSerializer.java
----------------------------------------------------------------------
diff --git a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/AccumuloPcjSerializer.java b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/AccumuloPcjSerializer.java
new file mode 100644
index 0000000..83d1c40
--- /dev/null
+++ b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/AccumuloPcjSerializer.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.storage.accumulo;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static mvm.rya.api.RdfCloudTripleStoreConstants.DELIM_BYTE;
+import static mvm.rya.api.RdfCloudTripleStoreConstants.DELIM_BYTES;
+import static mvm.rya.api.RdfCloudTripleStoreConstants.TYPE_DELIM_BYTE;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+
+import javax.annotation.ParametersAreNonnullByDefault;
+
+import org.openrdf.model.Value;
+import org.openrdf.query.Binding;
+import org.openrdf.query.BindingSet;
+import org.openrdf.query.algebra.evaluation.QueryBindingSet;
+
+import com.google.common.primitives.Bytes;
+
+import mvm.rya.api.domain.RyaType;
+import mvm.rya.api.resolver.RdfToRyaConversions;
+import mvm.rya.api.resolver.RyaContext;
+import mvm.rya.api.resolver.RyaToRdfConversions;
+import mvm.rya.api.resolver.RyaTypeResolverException;
+
+/**
+ * Converts {@link BindingSet}s to byte[]s and back again. The bytes do not
+ * include the binding names and are ordered with a {@link VariableOrder}.
+ */
+@ParametersAreNonnullByDefault
+public class AccumuloPcjSerializer implements BindingSetConverter<byte[]> {
+
+    @Override
+    public byte[] convert(BindingSet bindingSet, VariableOrder varOrder) throws BindingSetConversionException {
+        checkNotNull(bindingSet);
+        checkNotNull(varOrder);
+        checkBindingsSubsetOfVarOrder(bindingSet, varOrder);
+
+        // A list that holds all of the byte segments that will be concatenated at the end.
+        // This minimizes byte[] construction.
+        final List<byte[]> byteSegments = new LinkedList<>();
+
+        try {
+            for(final String varName: varOrder) {
+                // Only write information for a variable name if the binding set contains it.
+                if(bindingSet.hasBinding(varName)) {
+                    final RyaType rt = RdfToRyaConversions.convertValue(bindingSet.getBinding(varName).getValue());
+                    final byte[][] serializedVal = RyaContext.getInstance().serializeType(rt);
+                    byteSegments.add(serializedVal[0]);
+                    byteSegments.add(serializedVal[1]);
+                }
+
+                // But always write the value delimiter. If a value is missing, you'll see two delimiters next to each-other.
+                byteSegments.add(DELIM_BYTES);
+            }
+
+            return concat(byteSegments);
+        } catch (RyaTypeResolverException e) {
+            throw new BindingSetConversionException("Could not convert the BindingSet into a byte[].", e);
+        }
+    }
+
+    @Override
+    public BindingSet convert(byte[] bindingSetBytes, VariableOrder varOrder) throws BindingSetConversionException {
+        checkNotNull(bindingSetBytes);
+        checkNotNull(varOrder);
+
+        try {
+            // Slice the row into bindings.
+            List<byte[]> values = splitlByDelimByte(bindingSetBytes);
+            String[] varOrderStrings = varOrder.toArray();
+            checkArgument(values.size() == varOrderStrings.length);
+
+            // Convert the Binding bytes into a BindingSet.
+            final QueryBindingSet bindingSet = new QueryBindingSet();
+
+            for(int i = 0; i < varOrderStrings.length; i++) {
+                byte[] valueBytes = values.get(i);
+                if(valueBytes.length > 0) {
+                    String name = varOrderStrings[i];
+                    Value value = deserializeValue(valueBytes);
+                    bindingSet.addBinding(name, value);
+                }
+            }
+
+            return bindingSet;
+        } catch (RyaTypeResolverException e) {
+            throw new BindingSetConversionException("Could not convert the byte[] into a BindingSet.", e);
+        }
+    }
+
+    /**
+     * Checks to see if the names of all the {@link Binding}s in the {@link BindingSet}
+     * are a subset of the variables names in {@link VariableOrder}.
+     *
+     * @param bindingSet - The binding set whose Bindings will be inspected. (not null)
+     * @param varOrder - The names of the bindings that may appear in the BindingSet. (not null)
+     * @throws IllegalArgumentException Indicates the names of the bindings are
+     *   not a subset of the variable order.
+     */
+    private static void checkBindingsSubsetOfVarOrder(BindingSet bindingSet, VariableOrder varOrder) throws IllegalArgumentException {
+        checkNotNull(bindingSet);
+        checkNotNull(varOrder);
+
+        Set<String> bindingNames = bindingSet.getBindingNames();
+        List<String> varNames = varOrder.getVariableOrders();
+        checkArgument(varNames.containsAll(bindingNames), "The BindingSet contains a Binding whose name is not part of the VariableOrder.");
+    }
+
+    private static final byte[] concat(Iterable<byte[]> byteSegments) {
+        checkNotNull(byteSegments);
+
+        // Allocate a byte array that is able to hold the segments.
+        int length = 0;
+        for(byte[] byteSegment : byteSegments) {
+            length += byteSegment.length;
+        }
+        byte[] result = new byte[length];
+
+        // Copy the segments to the byte array and return it.
+        ByteBuffer buff = ByteBuffer.wrap(result);
+        for(byte[] byteSegment : byteSegments) {
+            buff.put(byteSegment);
+        }
+        return result;
+    }
+
+    private static List<byte[]> splitlByDelimByte(byte[] bindingSetBytes) {
+        checkNotNull(bindingSetBytes);
+
+        List<byte[]> values = new LinkedList<>();
+
+        ByteBuffer buff = ByteBuffer.wrap(bindingSetBytes);
+        int start = 0;
+        while(buff.hasRemaining()) {
+            if(buff.get() == DELIM_BYTE) {
+                // Mark the position of the value delimiter.
+                int end = buff.position();
+
+                // Move to the start of the value and copy the bytes into an array.
+                byte[] valueBytes = new byte[(end - start) -1];
+                buff.position(start);
+                buff.get(valueBytes);
+                buff.position(end);
+                values.add(valueBytes);
+
+                // Move the start of the next value to the end of this one.
+                start = end;
+            }
+        }
+
+        return values;
+    }
+
+    private static Value deserializeValue(byte[] byteVal) throws RyaTypeResolverException {
+         final int typeIndex = Bytes.indexOf(byteVal, TYPE_DELIM_BYTE);
+         checkArgument(typeIndex >= 0);
+         final byte[] data = Arrays.copyOf(byteVal, typeIndex);
+         final byte[] type = Arrays.copyOfRange(byteVal, typeIndex, byteVal.length);
+         final RyaType rt = RyaContext.getInstance().deserialize(Bytes.concat(data,type));
+         return RyaToRdfConversions.convertValue(rt);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/14073a23/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/BindingSetConverter.java
----------------------------------------------------------------------
diff --git a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/BindingSetConverter.java b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/BindingSetConverter.java
new file mode 100644
index 0000000..9a52531
--- /dev/null
+++ b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/BindingSetConverter.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.storage.accumulo;
+
+import javax.annotation.ParametersAreNonnullByDefault;
+
+import org.openrdf.query.Binding;
+import org.openrdf.query.BindingSet;
+
+/**
+ * Converts {@link BindingSet}s into other representations. This library is
+ * intended to convert between BindingSet and whatever format it is being
+ * stored as. These formats are often optimized for query evaluation.
+ *
+ * @param <T> Defines the type of model {@link BindingSet}s will be converted into/from.
+ */
+@ParametersAreNonnullByDefault
+public interface BindingSetConverter<T> {
+
+   /**
+    * Converts a {@link BindingSet} into the target model. The target model
+    * may not include every {@link Binding} that was in the original BindingSet,
+    * it may not include the binding names, and it may order the binding values.
+    * All of this information is specified using a {@link VariableOrder}.
+    * </p>
+    * Because the resulting model may not include the binding names from the
+    * original object, you must hold onto that information if you want to
+    * convert the resulting model back into a BindingSet later. Because the
+    * resulting model may only contain a subset of the original BindingSet's
+    * bindings, some information may be lost, so you may not be able to convert
+    * the target model back into the original BindingSet.
+    *
+    * @param bindingSet - The BindingSet that will be converted. (not null)
+    * @param varOrder - Which bindings and in what order they will appear in the
+    *   resulting model. (not null)
+    * @return The BindingSet formatted as the target model.
+    * @throws BindingSetConversionException The BindingSet was unable to be
+    *   converted into the target model. This will happen if the BindingSet has
+    *   a binding whose name is not in the VariableOrder or if one of the values
+    *   could not be converted into the target model.
+    */
+   public T convert(BindingSet bindingSet, VariableOrder varOrder) throws BindingSetConversionException;
+
+   /**
+    * Converts the target model representation of a {@link BindingSet} as is
+    * created by {@link #convert(BindingSet, VariableOrder)} back into a
+    * BindingSet.
+    * </p>
+    * You must provide the Binding names and the order they were written to
+    * by using a {@link VariableOrder}.
+    * </p>
+    * If there is no value for one of the variable order names, then that binding
+    * will be missing from the resulting BindingSet.
+    *
+    * @param bindingSet - The BindingSet formatted as the target model that will
+    *   be converted. (not null)
+    * @param varOrder - The VariableOrder that was used to create the target model. (not null)
+    * @return The {@link BindingSet} representation of the target model.
+    * @throws BindingSetConversionException The target model was unable to be
+    *   converted back into a BindingSet.
+    */
+   public BindingSet convert(T bindingSet, VariableOrder varOrder) throws BindingSetConversionException;
+
+   /**
+    * One of the conversion methods of {@link BindingSetConverter} was unable to
+    * to convert the {@link BindingSet} to/from the converted model.
+    */
+   public static class BindingSetConversionException extends Exception {
+       private static final long serialVersionUID = 1L;
+
+       /**
+        * Constructs an instance of {@link BindingSetConversionException}.
+        *
+        * @param message - Describes why this exception was thrown.
+        */
+       public BindingSetConversionException(final String message) {
+           super(message);
+       }
+
+       /**
+        * BindingSetConversionException
+        *
+        * @param message - Describes why this exception was thrown.
+        * @param cause - The exception that caused this one to be thrown.
+        */
+       public BindingSetConversionException(final String message, final Throwable cause) {
+           super(message, cause);
+       }
+   }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/14073a23/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/BindingSetDecorator.java
----------------------------------------------------------------------
diff --git a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/BindingSetDecorator.java b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/BindingSetDecorator.java
new file mode 100644
index 0000000..ddc7be5
--- /dev/null
+++ b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/BindingSetDecorator.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.storage.accumulo;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.Iterator;
+import java.util.Set;
+
+import org.openrdf.model.Value;
+import org.openrdf.query.Binding;
+import org.openrdf.query.BindingSet;
+
+/**
+ * Abstracts out the decoration of a {@link BindingSet}.
+ */
+public abstract class BindingSetDecorator implements BindingSet {
+    private static final long serialVersionUID = 1L;
+    protected final BindingSet set;
+    private volatile int hashCode;
+
+    /**
+     * Constructs a new {@link BindingSetDecorator}, decorating the provided
+     * {@link BindingSet}.
+     * @param set - The {@link BindingSet} to be decorated. (not null)
+     */
+    public BindingSetDecorator(final BindingSet set) {
+        this.set = checkNotNull(set);
+    }
+
+    @Override
+    public Iterator<Binding> iterator() {
+        return set.iterator();
+    }
+
+    @Override
+    public Set<String> getBindingNames() {
+        return set.getBindingNames();
+    }
+
+    @Override
+    public Binding getBinding(final String bindingName) {
+        return set.getBinding(bindingName);
+    }
+
+    @Override
+    public boolean hasBinding(final String bindingName) {
+        return set.hasBinding(bindingName);
+    }
+
+    @Override
+    public Value getValue(final String bindingName) {
+        return set.getValue(bindingName);
+    }
+
+    @Override
+    public int size() {
+        return set.size();
+    }
+
+    @Override
+    public boolean equals(final Object o) {
+        if(!(o instanceof BindingSetDecorator)) {
+            return false;
+        }
+        final BindingSetDecorator other = (BindingSetDecorator) o;
+        return set.equals(other.set);
+    }
+
+    @Override
+    public int hashCode() {
+        int result = hashCode;
+        if(result == 0) {
+            result = 31 * result + set.hashCode();
+            hashCode = result;
+        }
+        return result;
+    }
+
+    @Override
+    public String toString() {
+        final StringBuilder sb = new StringBuilder();
+        sb.append("  names: ");
+        for (final String name : getBindingNames()) {
+            sb.append("\n    [name]: " + name + "  ---  [value]: " + getBinding(name).getValue().toString());
+        }
+        return sb.toString();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/14073a23/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/BindingSetStringConverter.java
----------------------------------------------------------------------
diff --git a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/BindingSetStringConverter.java b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/BindingSetStringConverter.java
new file mode 100644
index 0000000..913cfc9
--- /dev/null
+++ b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/BindingSetStringConverter.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.storage.accumulo;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+
+import javax.annotation.ParametersAreNonnullByDefault;
+
+import org.openrdf.model.URI;
+import org.openrdf.model.Value;
+import org.openrdf.model.ValueFactory;
+import org.openrdf.model.impl.URIImpl;
+import org.openrdf.model.impl.ValueFactoryImpl;
+import org.openrdf.model.vocabulary.XMLSchema;
+import org.openrdf.query.Binding;
+import org.openrdf.query.BindingSet;
+import org.openrdf.query.algebra.evaluation.QueryBindingSet;
+
+import com.google.common.base.Joiner;
+
+import mvm.rya.api.domain.RyaType;
+import mvm.rya.api.resolver.RdfToRyaConversions;
+
+/**
+ * Converts {@link BindingSet}s to Strings and back again. The Strings do not
+ * include the binding names and are ordered with a {@link VariableOrder}.
+ */
+@ParametersAreNonnullByDefault
+public class BindingSetStringConverter implements BindingSetConverter<String> {
+
+    public static final String BINDING_DELIM = ":::";
+    public static final String TYPE_DELIM = "<<~>>";
+    public static final String NULL_VALUE_STRING = Character.toString( '\0' );
+
+    private static final ValueFactory valueFactory = new ValueFactoryImpl();
+
+    @Override
+    public String convert(final BindingSet bindingSet, final VariableOrder varOrder) {
+        checkBindingsSubsetOfVarOrder(bindingSet, varOrder);
+
+        // Convert each Binding to a String.
+        final List<String> bindingStrings = new ArrayList<>();
+        for(final String varName : varOrder) {
+            if(bindingSet.hasBinding(varName)) {
+                // Add a value to the binding set.
+                final Value value = bindingSet.getBinding(varName).getValue();
+                final RyaType ryaValue = RdfToRyaConversions.convertValue(value);
+                final String bindingString = ryaValue.getData() + TYPE_DELIM + ryaValue.getDataType();
+                bindingStrings.add(bindingString);
+            } else {
+                // Add a null value to the binding set.
+                bindingStrings.add(NULL_VALUE_STRING);
+            }
+        }
+
+        // Join the bindings using the binding delim.
+        return Joiner.on(BINDING_DELIM).join(bindingStrings);
+    }
+
+    /**
+     * Checks to see if the names of all the {@link Binding}s in the {@link BindingSet}
+     * are a subset of the variables names in {@link VariableOrder}.
+     *
+     * @param bindingSet - The binding set whose Bindings will be inspected. (not null)
+     * @param varOrder - The names of the bindings that may appear in the BindingSet. (not null)
+     * @throws IllegalArgumentException Indicates the names of the bindings are
+     *   not a subset of the variable order.
+     */
+    private static void checkBindingsSubsetOfVarOrder(final BindingSet bindingSet, final VariableOrder varOrder) throws IllegalArgumentException {
+        checkNotNull(bindingSet);
+        checkNotNull(varOrder);
+
+        final Set<String> bindingNames = bindingSet.getBindingNames();
+        final List<String> varNames = varOrder.getVariableOrders();
+        checkArgument(varNames.containsAll(bindingNames), "The BindingSet contains a Binding whose name is not part of the VariableOrder.");
+    }
+
+    @Override
+    public BindingSet convert(final String bindingSetString, final VariableOrder varOrder) {
+        checkNotNull(bindingSetString);
+        checkNotNull(varOrder);
+
+        final String[] bindingStrings = bindingSetString.split(BINDING_DELIM);
+        final String[] varOrrderArr = varOrder.toArray();
+        checkArgument(varOrrderArr.length == bindingStrings.length, "The number of Bindings must match the length of the VariableOrder.");
+
+        final QueryBindingSet bindingSet = new QueryBindingSet();
+        for(int i = 0; i < bindingStrings.length; i++) {
+            final String bindingString = bindingStrings[i];
+            if(!NULL_VALUE_STRING.equals(bindingString)) {
+                final String name = varOrrderArr[i];
+                final Value value = toValue(bindingStrings[i]);
+                bindingSet.addBinding(name, value);
+            }
+        }
+        return bindingSet;
+    }
+
+    /**
+     * Creates a {@link Value} from a String representation of it.
+     *
+     * @param valueString - The String representation of the value. (not null)
+     * @return The {@link Value} representation of the String.
+     */
+    protected static Value toValue(final String valueString) {
+        checkNotNull(valueString);
+
+        // Split the String that was stored in Fluo into its Value and Type parts.
+        final String[] valueAndType = valueString.split(TYPE_DELIM);
+        if(valueAndType.length != 2) {
+            throw new IllegalArgumentException("Array must contain data and type info!");
+        }
+
+        final String dataString = valueAndType[0];
+        final String typeString = valueAndType[1];
+
+        // Convert the String Type into a URI that describes the type.
+        final URI typeURI = valueFactory.createURI(typeString);
+
+        // Convert the String Value into a Value.
+        final Value value = typeURI.equals(XMLSchema.ANYURI) ?
+                valueFactory.createURI(dataString) :
+                valueFactory.createLiteral(dataString, new URIImpl(typeString));
+
+        return value;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/14073a23/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjTableNameFactory.java
----------------------------------------------------------------------
diff --git a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjTableNameFactory.java b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjTableNameFactory.java
new file mode 100644
index 0000000..653b4cc
--- /dev/null
+++ b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjTableNameFactory.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.storage.accumulo;
+
+import java.util.UUID;
+
+/**
+ * Creates Accumulo table names that may be recognized by Rya as a table that
+ * holds the results of a Precomputed Join.
+ */
+public class PcjTableNameFactory {
+
+    /**
+     * Creates an Accumulo table names that may be recognized by Rya as a table
+     * that holds the results of a Precomputed Join.
+     * </p>
+     * An Accumulo cluster may host more than one Rya instance. To ensure each
+     * Rya instance's RDF Triples are segregated from each other, they are stored
+     * within different Accumulo tables. This is accomplished by prepending a
+     * {@code tablePrefix} to every table that is owned by a Rya instance. Each
+     * PCJ table is owned by a specific Rya instance, so it too must be prepended
+     * with the instance's {@code tablePrefix}.
+     * </p>
+     * When Rya scans for PCJ tables that it may use when creating execution plans,
+     * it looks for any table in Accumulo that has a name starting with its
+     * {@code tablePrefix} immediately followed by "INDEX". Anything following
+     * that portion of the table name is just a unique identifier for the SPARQL
+     * query that is being precomputed. Here's an example of what a table name
+     * may look like:
+     * <pre>
+     *     demo_INDEX_QUERY:c8f5367c-1660-4210-a7cb-681ed004d2d9
+     * </pre>
+     * The "demo_INDEX" portion indicates this table is a PCJ table for the "demo_"
+     * instance of Rya. The "_QUERY:c8f5367c-1660-4210-a7cb-681ed004d2d9" portion
+     * could be anything at all that uniquely identifies the query that is being updated.
+     *
+     * @param tablePrefix - The Rya instance's table prefix. (not null)
+     * @param uniqueId - The unique portion of the Rya PCJ table name. (not null)
+     * @return A Rya PCJ table name built using the provided values.
+     */
+    public String makeTableName(final String tablePrefix, final String uniqueId) {
+        return tablePrefix + "INDEX_" + uniqueId;
+    }
+
+    /**
+     * Invokes {@link #makeTableName(String, String)} with a randomly generated
+     * UUID as the {@code uniqueId}.
+     *
+     * @param tablePrefix - The Rya instance's table prefix. (not null)
+     * @return A Rya PCJ table name built using the provided values.
+     */
+    public String makeTableName(final String tablePrefix) {
+        final String uniqueId = UUID.randomUUID().toString().replaceAll("-", "");
+        return makeTableName(tablePrefix, uniqueId);
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/14073a23/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjTables.java
----------------------------------------------------------------------
diff --git a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjTables.java b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjTables.java
new file mode 100644
index 0000000..db74996
--- /dev/null
+++ b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjTables.java
@@ -0,0 +1,653 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.storage.accumulo;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import javax.annotation.ParametersAreNonnullByDefault;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.ConditionalWriter;
+import org.apache.accumulo.core.client.ConditionalWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.MutationsRejectedException;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableExistsException;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.admin.TableOperations;
+import org.apache.accumulo.core.client.lexicoder.ListLexicoder;
+import org.apache.accumulo.core.client.lexicoder.LongLexicoder;
+import org.apache.accumulo.core.client.lexicoder.StringLexicoder;
+import org.apache.accumulo.core.data.Condition;
+import org.apache.accumulo.core.data.ConditionalMutation;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.security.ColumnVisibility;
+import org.apache.hadoop.io.Text;
+import org.apache.log4j.Logger;
+import org.apache.rya.indexing.pcj.storage.PcjMetadata;
+import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.PCJStorageException;
+import org.apache.rya.indexing.pcj.storage.accumulo.BindingSetConverter.BindingSetConversionException;
+import org.openrdf.query.BindingSet;
+import org.openrdf.query.MalformedQueryException;
+import org.openrdf.query.QueryEvaluationException;
+import org.openrdf.query.QueryLanguage;
+import org.openrdf.query.TupleQuery;
+import org.openrdf.query.TupleQueryResult;
+import org.openrdf.repository.RepositoryConnection;
+import org.openrdf.repository.RepositoryException;
+
+import com.google.common.base.Optional;
+
+/**
+ * Functions that create and maintain the PCJ tables that are used by Rya.
+ */
+@ParametersAreNonnullByDefault
+public class PcjTables {
+    private static final Logger log = Logger.getLogger(PcjTables.class);
+
+    /**
+     * The Row ID of all {@link PcjMetadata} entries that are stored in Accumulo.
+     */
+    private static final Text PCJ_METADATA_ROW_ID = new Text("pcjMetadata");
+
+    /**
+     * The Column Family for all PCJ metadata entries.
+     */
+    private static final Text PCJ_METADATA_FAMILY = new Text("metadata");
+
+    /**
+     * The Column Qualifier for the SPARQL query a PCJ is built from.
+     */
+    private static final Text PCJ_METADATA_SPARQL_QUERY = new Text("sparql");
+
+    /**
+     * The Column Qualifier for the cardinality of a PCJ.
+     */
+    private static final Text PCJ_METADATA_CARDINALITY = new Text("cardinality");
+
+    /**
+     * The Column Qualifier for the various variable orders a PCJ's results are written to.
+     */
+    private static final Text PCJ_METADATA_VARIABLE_ORDERS = new Text("variableOrders");
+
+    // Lexicoders used to read/write PcjMetadata to/from Accumulo.
+    private static final LongLexicoder longLexicoder = new LongLexicoder();
+    private static final StringLexicoder stringLexicoder = new StringLexicoder();
+    private static final ListLexicoder<String> listLexicoder = new ListLexicoder<>(stringLexicoder);
+
+    /**
+     * Create a new PCJ table within an Accumulo instance for a SPARQL query.
+     * For example, calling the function like this:
+     * <pre>
+     * PcjTables.createPcjTable(
+     *     accumuloConn,
+     *
+     *     "foo_INDEX_query1234",
+     *
+     *     Sets.newHashSet(
+     *         new VariableOrder("city;worker;customer"),
+     *         new VariableOrder("worker;customer;city") ,
+     *         new VariableOrder("customer;city;worker")),
+     *
+     *     "SELECT ?customer ?worker ?city { " +
+     *            "?customer &lt;http://talksTo> ?worker. " +
+     *            "?worker &lt;http://livesIn> ?city. " +
+     *            "?worker &lt;http://worksAt> &lt;http://Home>. " +
+     *     "}");
+     * </pre>
+     * </p>
+     * Will result in an Accumulo table named "foo_INDEX_query1234" with the following entries:
+     * <table border="1" style="width:100%">
+     *   <tr> <th>Row ID</td>  <th>Column</td>  <th>Value</td> </tr>
+     *   <tr> <td>pcjMetadata</td> <td>metadata:sparql</td> <td> ... UTF-8 bytes encoding the query string ... </td> </tr>
+     *   <tr> <td>pcjMetadata</td> <td>metadata:cardinality</td> <td> The query's cardinality </td> </tr>
+     *   <tr> <td>pcjMetadata</td> <td>metadata:variableOrders</td> <td> The variable orders the results are written to </td> </tr>
+     * </table>
+     *
+     * @param accumuloConn - A connection to the Accumulo that hosts the PCJ table. (not null)
+     * @param pcjTableName - The name of the table that will be created. (not null)
+     * @param varOrders - The variable orders the results within the table will be written to. (not null)
+     * @param sparql - The query this table's results solves. (not null)
+     * @throws PCJStorageException Could not create a new PCJ table either because Accumulo
+     *   would not let us create it or the PCJ metadata was not able to be written to it.
+     */
+    public void createPcjTable(
+            final Connector accumuloConn,
+            final String pcjTableName,
+            final Set<VariableOrder> varOrders,
+            final String sparql) throws PCJStorageException {
+        checkNotNull(accumuloConn);
+        checkNotNull(pcjTableName);
+        checkNotNull(varOrders);
+        checkNotNull(sparql);
+
+        final TableOperations tableOps = accumuloConn.tableOperations();
+        if(!tableOps.exists(pcjTableName)) {
+            try {
+                // Create the new table in Accumulo.
+                tableOps.create(pcjTableName);
+
+                // Write the PCJ Metadata to the newly created table.
+                final PcjMetadata pcjMetadata = new PcjMetadata(sparql, 0L, varOrders);
+                final List<Mutation> mutations = makeWriteMetadataMutations(pcjMetadata);
+
+                final BatchWriter writer = accumuloConn.createBatchWriter(pcjTableName, new BatchWriterConfig());
+                writer.addMutations(mutations);
+                writer.close();
+            } catch (final TableExistsException e) {
+                log.warn("Something else just created the Rya PCJ export table named '" + pcjTableName
+                        + "'. This is unexpected, but we will continue as normal.");
+            } catch (AccumuloException | AccumuloSecurityException | TableNotFoundException e) {
+                throw new PCJStorageException("Could not create a new PCJ named: " + pcjTableName, e);
+            }
+        }
+    }
+
+    /**
+     * Create the {@link Mutation}s required to write a {@link PCJMetadata} object
+     * to an Accumulo table.
+     *
+     * @param metadata - The metadata to write. (not null)
+     * @return An ordered list of mutations that write the metadata to an Accumulo table.
+     */
+    private static List<Mutation> makeWriteMetadataMutations(final PcjMetadata metadata) {
+        checkNotNull(metadata);
+
+        final List<Mutation> mutations = new LinkedList<>();
+
+        // SPARQL Query
+        Mutation mutation = new Mutation(PCJ_METADATA_ROW_ID);
+        final Value query = new Value( stringLexicoder.encode(metadata.getSparql()) );
+        mutation.put(PCJ_METADATA_FAMILY, PCJ_METADATA_SPARQL_QUERY, query);
+        mutations.add(mutation);
+
+        // Cardinality
+        mutation = new Mutation(PCJ_METADATA_ROW_ID);
+        final Value cardinality = new Value( longLexicoder.encode(new Long(metadata.getCardinality())) );
+        mutation.put(PCJ_METADATA_FAMILY, PCJ_METADATA_CARDINALITY, cardinality);
+        mutations.add(mutation);
+
+        //  Variable Orders
+        final List<String> varOrderStrings = new ArrayList<>();
+        for(final VariableOrder varOrder : metadata.getVarOrders()) {
+            varOrderStrings.add( varOrder.toString() );
+        }
+
+        mutation = new Mutation(PCJ_METADATA_ROW_ID);
+        final Value variableOrders = new Value( listLexicoder.encode(varOrderStrings) );
+        mutation.put(PCJ_METADATA_FAMILY, PCJ_METADATA_VARIABLE_ORDERS, variableOrders);
+        mutations.add(mutation);
+
+        return mutations;
+    }
+
+    /**
+     * Fetch the {@link PCJMetadata} from an Accumulo table.
+     * <p>
+     * This method assumes the PCJ table has already been created.
+     *
+     * @param accumuloConn - A connection to the Accumulo that hosts the PCJ table. (not null)
+     * @param pcjTableName - The name of the table that will be search. (not null)
+     * @return The PCJ Metadata that has been stolred in the in the PCJ Table.
+     * @throws PCJStorageException The PCJ Table does not exist.
+     */
+    public PcjMetadata getPcjMetadata(
+            final Connector accumuloConn,
+            final String pcjTableName) throws PCJStorageException {
+        checkNotNull(accumuloConn);
+        checkNotNull(pcjTableName);
+
+        try {
+            // Create an Accumulo scanner that iterates through the metadata entries.
+            final Scanner scanner = accumuloConn.createScanner(pcjTableName, new Authorizations());
+            final Iterator<Entry<Key, Value>> entries = scanner.iterator();
+
+            // No metadata has been stored in the table yet.
+            if(!entries.hasNext()) {
+                throw new PCJStorageException("Could not find any PCJ metadata in the table named: " + pcjTableName);
+            }
+
+            // Fetch the metadata from the entries. Assuming they all have the same cardinality and sparql query.
+            String sparql = null;
+            Long cardinality = null;
+            final Set<VariableOrder> varOrders = new HashSet<>();
+
+            while(entries.hasNext()) {
+                final Entry<Key, Value> entry = entries.next();
+                final Text columnQualifier = entry.getKey().getColumnQualifier();
+                final byte[] value = entry.getValue().get();
+
+                if(columnQualifier.equals(PCJ_METADATA_SPARQL_QUERY)) {
+                    sparql = stringLexicoder.decode(value);
+                } else if(columnQualifier.equals(PCJ_METADATA_CARDINALITY)) {
+                    cardinality = longLexicoder.decode(value);
+                } else if(columnQualifier.equals(PCJ_METADATA_VARIABLE_ORDERS)) {
+                    for(final String varOrderStr : listLexicoder.decode(value)) {
+                        varOrders.add( new VariableOrder(varOrderStr) );
+                    }
+                }
+            }
+
+            return new PcjMetadata(sparql, cardinality, varOrders);
+
+        } catch (final TableNotFoundException e) {
+            throw new PCJStorageException("Could not add results to a PCJ because the PCJ table does not exist.", e);
+        }
+    }
+
+    /**
+     * Add a collection of results to a PCJ table. The table's cardinality will
+     * be updated to include the new results.
+     * <p>
+     * This method assumes the PCJ table has already been created.
+     *
+     * @param accumuloConn - A connection to the Accumulo that hosts the PCJ table. (not null)
+     * @param pcjTableName - The name of the PCJ table that will receive the results. (not null)
+     * @param results - Binding sets that will be written to the PCJ table. (not null)
+     * @throws PCJStorageException The provided PCJ table doesn't exist, is missing the
+     *   PCJ metadata, or the result could not be written to it.
+     */
+    public void addResults(
+            final Connector accumuloConn,
+            final String pcjTableName,
+            final Collection<VisibilityBindingSet> results) throws PCJStorageException {
+        checkNotNull(accumuloConn);
+        checkNotNull(pcjTableName);
+        checkNotNull(results);
+
+        // Write a result to each of the variable orders that are in the table.
+        writeResults(accumuloConn, pcjTableName, results);
+
+        // Increment the cardinality of the query by the number of new results.
+        updateCardinality(accumuloConn, pcjTableName, results.size());
+    }
+
+    /**
+     * Add a collection of results to a specific PCJ table.
+     *
+     * @param accumuloConn - A connection to the Accumulo that hosts the PCJ table. (not null)
+     * @param pcjTableName - The name of the PCJ table that will receive the results. (not null)
+     * @param results - Binding sets that will be written to the PCJ table. (not null)
+     * @throws PCJStorageException The provided PCJ table doesn't exist, is missing the
+     *   PCJ metadata, or the result could not be written to it.
+     */
+    private void writeResults(
+            final Connector accumuloConn,
+            final String pcjTableName,
+            final Collection<VisibilityBindingSet> results) throws PCJStorageException {
+        checkNotNull(accumuloConn);
+        checkNotNull(pcjTableName);
+        checkNotNull(results);
+
+        // Fetch the variable orders from the PCJ table.
+        final PcjMetadata metadata = getPcjMetadata(accumuloConn, pcjTableName);
+
+        // Write each result formatted using each of the variable orders.
+        BatchWriter writer = null;
+        try {
+            writer = accumuloConn.createBatchWriter(pcjTableName, new BatchWriterConfig());
+            for(final VisibilityBindingSet result : results) {
+                final Set<Mutation> addResultMutations = makeWriteResultMutations(metadata.getVarOrders(), result);
+                writer.addMutations( addResultMutations );
+            }
+        } catch (TableNotFoundException | MutationsRejectedException e) {
+            throw new PCJStorageException("Could not add results to the PCJ table named: " + pcjTableName, e);
+        } finally {
+            if(writer != null) {
+                try {
+                    writer.close();
+                } catch (final MutationsRejectedException e) {
+                    throw new PCJStorageException("Could not add results to a PCJ table because some of the mutations were rejected.", e);
+                }
+            }
+        }
+    }
+
+    /**
+     * Create the {@link Mutations} required to write a new {@link BindingSet}
+     * to a PCJ table for each {@link VariableOrder} that is provided.
+     *
+     * @param varOrders - The variables orders the result will be written to. (not null)
+     * @param result - A new PCJ result. (not null)
+     * @return Mutation that will write the result to a PCJ table.
+     * @throws PCJStorageException The binding set could not be encoded.
+     */
+    private static Set<Mutation> makeWriteResultMutations(
+            final Set<VariableOrder> varOrders,
+            final VisibilityBindingSet result) throws PCJStorageException {
+        checkNotNull(varOrders);
+        checkNotNull(result);
+
+        final Set<Mutation> mutations = new HashSet<>();
+        final AccumuloPcjSerializer converter = new AccumuloPcjSerializer();
+
+        for(final VariableOrder varOrder : varOrders) {
+            try {
+                // Serialize the result to the variable order.
+                final byte[] serializedResult = converter.convert(result, varOrder);
+
+                // Row ID = binding set values, Column Family = variable order of the binding set.
+                final Mutation addResult = new Mutation(serializedResult);
+                final String visibility = result.getVisibility();
+                addResult.put(varOrder.toString(), "", new ColumnVisibility(visibility), "");
+                mutations.add(addResult);
+            } catch(final BindingSetConversionException e) {
+                throw new PCJStorageException("Could not serialize a result.", e);
+            }
+        }
+
+        return mutations;
+    }
+
+    /**
+     * Update the cardinality of a PCJ by a {@code delta}.
+     *
+     * @param accumuloConn - A connection to the Accumulo that hosts the PCJ table. (not null)
+     * @param pcjTableName - The name of the PCJ table that will have its cardinality updated. (not null)
+     * @param delta - How much the cardinality will change.
+     * @throws PCJStorageException The cardinality could not be updated.
+     */
+    private void updateCardinality(
+            final Connector accumuloConn,
+            final String pcjTableName,
+            final long delta) throws PCJStorageException {
+        checkNotNull(accumuloConn);
+        checkNotNull(pcjTableName);
+
+        ConditionalWriter conditionalWriter = null;
+        try {
+            conditionalWriter = accumuloConn.createConditionalWriter(pcjTableName, new ConditionalWriterConfig());
+
+            boolean updated = false;
+            while(!updated) {
+                // Write the conditional update request to Accumulo.
+                final long cardinality = getPcjMetadata(accumuloConn, pcjTableName).getCardinality();
+                final ConditionalMutation mutation = makeUpdateCardinalityMutation(cardinality, delta);
+                final ConditionalWriter.Result result = conditionalWriter.write(mutation);
+
+                // Interpret the result.
+                switch(result.getStatus()) {
+                    case ACCEPTED:
+                        updated = true;
+                        break;
+                    case REJECTED:
+                        break;
+                    case UNKNOWN:
+                        // We do not know if the mutation succeeded. At best, we can hope the metadata hasn't been updated
+                        // since we originally fetched it and try again. Otherwise, continue forwards as if it worked. It's
+                        // okay if this number is slightly off.
+                        final long newCardinality = getPcjMetadata(accumuloConn, pcjTableName).getCardinality();
+                        if(newCardinality != cardinality) {
+                            updated = true;
+                        }
+                        break;
+                    case VIOLATED:
+                        throw new PCJStorageException("The cardinality could not be updated because the commit violated a table constraint.");
+                    case INVISIBLE_VISIBILITY:
+                        throw new PCJStorageException("The condition contains a visibility the updater can not satisfy.");
+                }
+            }
+        } catch (AccumuloException | AccumuloSecurityException | TableNotFoundException e) {
+            throw new PCJStorageException("Could not update the cardinality value of the PCJ Table named: " + pcjTableName, e);
+        } finally {
+            if(conditionalWriter != null) {
+                conditionalWriter.close();
+            }
+        }
+    }
+
+    /**
+     * Creates a {@link ConditionalMutation} that only updates the cardinality
+     * of the PCJ table if the old value has not changed by the time this mutation
+     * is committed to Accumulo.
+     *
+     * @param current - The current cardinality value.
+     * @param delta - How much the cardinality will change.
+     * @return The mutation that will perform the conditional update.
+     */
+    private static ConditionalMutation makeUpdateCardinalityMutation(final long current, final long delta) {
+        // Try to update the cardinality by the delta.
+        final ConditionalMutation mutation = new ConditionalMutation(PCJ_METADATA_ROW_ID);
+        final Condition lastCardinalityStillCurrent = new Condition(
+                PCJ_METADATA_FAMILY,
+                PCJ_METADATA_CARDINALITY);
+
+        // Require the old cardinality to be the value we just read.
+        final byte[] currentCardinalityBytes = longLexicoder.encode( current );
+        lastCardinalityStillCurrent.setValue( currentCardinalityBytes );
+        mutation.addCondition(lastCardinalityStillCurrent);
+
+        // If that is the case, then update to the new value.
+        final Value newCardinality = new Value( longLexicoder.encode(current + delta) );
+        mutation.put(PCJ_METADATA_FAMILY, PCJ_METADATA_CARDINALITY, newCardinality);
+        return mutation;
+    }
+
+    /**
+     * Scan Rya for results that solve the PCJ's query and store them in the PCJ table.
+     * <p>
+     * This method assumes the PCJ table has already been created.
+     *
+     * @param accumuloConn - A connection to the Accumulo that hosts the PCJ table. (not null)
+     * @param pcjTableName - The name of the PCJ table that will receive the results. (not null)
+     * @param ryaConn - A connection to the Rya store that will be queried to find results. (not null)
+     * @throws PCJStorageException If results could not be written to the PCJ table,
+     *   the PCJ table does not exist, or the query that is being execute
+     *   was malformed.
+     */
+    public void populatePcj(
+            final Connector accumuloConn,
+            final String pcjTableName,
+            final RepositoryConnection ryaConn) throws PCJStorageException {
+        checkNotNull(accumuloConn);
+        checkNotNull(pcjTableName);
+        checkNotNull(ryaConn);
+
+        try {
+            // Fetch the query that needs to be executed from the PCJ table.
+            final PcjMetadata pcjMetadata = getPcjMetadata(accumuloConn, pcjTableName);
+            final String sparql = pcjMetadata.getSparql();
+
+            // Query Rya for results to the SPARQL query.
+            final TupleQuery query = ryaConn.prepareTupleQuery(QueryLanguage.SPARQL, sparql);
+            final TupleQueryResult results = query.evaluate();
+
+            // Load batches of 1000 of them at a time into the PCJ table
+            final Set<VisibilityBindingSet> batch = new HashSet<>(1000);
+            while(results.hasNext()) {
+                batch.add( new VisibilityBindingSet(results.next()) );
+
+                if(batch.size() == 1000) {
+                    addResults(accumuloConn, pcjTableName, batch);
+                    batch.clear();
+                }
+            }
+
+            if(!batch.isEmpty()) {
+                addResults(accumuloConn, pcjTableName, batch);
+            }
+
+        } catch (RepositoryException | MalformedQueryException | QueryEvaluationException e) {
+            throw new PCJStorageException("Could not populate a PCJ table with Rya results for the table named: " + pcjTableName, e);
+        }
+    }
+
+    private static final PcjVarOrderFactory DEFAULT_VAR_ORDER_FACTORY = new ShiftVarOrderFactory();
+
+    /**
+     * Creates a new PCJ Table in Accumulo and populates it by scanning an
+     * instance of Rya for historic matches.
+     * <p>
+     * If any portion of this operation fails along the way, the partially
+     * create PCJ table will be left in Accumulo.
+     *
+     * @param ryaConn - Connects to the Rya that will be scanned. (not null)
+     * @param accumuloConn - Connects to the accumulo that hosts the PCJ results. (not null)
+     * @param pcjTableName - The name of the PCJ table that will be created. (not null)
+     * @param sparql - The SPARQL query whose results will be loaded into the table. (not null)
+     * @param resultVariables - The variables that are included in the query's resulting binding sets. (not null)
+     * @param pcjVarOrderFactory - An optional factory that indicates the various variable orders
+     *   the results will be stored in. If one is not provided, then {@link ShiftVarOrderFactory}
+     *   is used by default. (not null)
+     * @throws PCJStorageException The PCJ table could not be create or the values from
+     *   Rya were not able to be loaded into it.
+     */
+    public void createAndPopulatePcj(
+            final RepositoryConnection ryaConn,
+            final Connector accumuloConn,
+            final String pcjTableName,
+            final String sparql,
+            final String[] resultVariables,
+            final Optional<PcjVarOrderFactory> pcjVarOrderFactory) throws PCJStorageException {
+        checkNotNull(ryaConn);
+        checkNotNull(accumuloConn);
+        checkNotNull(pcjTableName);
+        checkNotNull(sparql);
+        checkNotNull(resultVariables);
+        checkNotNull(pcjVarOrderFactory);
+
+        // Create the PCJ's variable orders.
+        final PcjVarOrderFactory varOrderFactory = pcjVarOrderFactory.or(DEFAULT_VAR_ORDER_FACTORY);
+        final Set<VariableOrder> varOrders = varOrderFactory.makeVarOrders( new VariableOrder(resultVariables) );
+
+        // Create the PCJ table in Accumulo.
+        createPcjTable(accumuloConn, pcjTableName, varOrders, sparql);
+
+        // Load historic matches from Rya into the PCJ table.
+        populatePcj(accumuloConn, pcjTableName, ryaConn);
+    }
+
+    /**
+     * List the table names of the PCJ index tables that are stored in Accumulo
+     * for a specific instance of Rya.
+     *
+     * @param accumuloConn - Connects to the accumulo that hosts the PCJ indices. (not null)
+     * @param ryaInstanceName - The name of the Rya instance. (not null)
+     * @return A list of Accumulo table names that hold PCJ index data for a
+     *   specific Rya instance.
+     */
+    public List<String> listPcjTables(final Connector accumuloConn, final String ryaInstanceName) {
+        checkNotNull(accumuloConn);
+        checkNotNull(ryaInstanceName);
+
+        final List<String> pcjTables = new ArrayList<>();
+
+        final String pcjPrefix = ryaInstanceName + "INDEX";
+        boolean foundInstance = false;
+
+        for(final String tableName : accumuloConn.tableOperations().list()) {
+            if(tableName.startsWith(ryaInstanceName)) {
+                // This table is part of the target Rya instance.
+                foundInstance = true;
+
+                if(tableName.startsWith(pcjPrefix)) {
+                    pcjTables.add(tableName);
+                }
+            }
+
+            else if(foundInstance) {
+                // We have encountered the first table name that does not start
+                // with the rya instance name after those that do. Because the
+                // list is sorted, there can't be any more pcj tables for the
+                // target instance in the list.
+                break;
+            }
+        }
+
+        return pcjTables;
+    }
+
+    /**
+     * Deletes all of the rows that are in a PCJ index and sets its cardinality back to 0.
+     *
+     * @param accumuloConn - Connects to the Accumulo that hosts the PCJ indices. (not null)
+     * @param pcjTableName - The name of the PCJ table that will be purged. (not null)
+     * @throws PCJStorageException Either the rows could not be dropped from the
+     *   PCJ table or the metadata could not be written back to the table.
+     */
+    public void purgePcjTable(final Connector accumuloConn, final String pcjTableName) throws PCJStorageException {
+        checkNotNull(accumuloConn);
+        checkNotNull(pcjTableName);
+
+        // Fetch the metadaata from the PCJ table.
+        final PcjMetadata oldMetadata = getPcjMetadata(accumuloConn, pcjTableName);
+
+        // Delete all of the rows
+        try {
+            accumuloConn.tableOperations().deleteRows(pcjTableName, null, null);
+        } catch (AccumuloException | AccumuloSecurityException | TableNotFoundException e) {
+            throw new PCJStorageException("Could not delete the rows of data from PCJ table named: " + pcjTableName, e);
+        }
+
+        // Store the new metadata.
+        final PcjMetadata newMetadata = new PcjMetadata(oldMetadata.getSparql(), 0L, oldMetadata.getVarOrders());
+        final List<Mutation> mutations = makeWriteMetadataMutations(newMetadata);
+
+        BatchWriter writer = null;
+        try {
+            writer = accumuloConn.createBatchWriter(pcjTableName, new BatchWriterConfig());
+            writer.addMutations(mutations);
+            writer.flush();
+        } catch (final TableNotFoundException | MutationsRejectedException e) {
+            throw new PCJStorageException("Could not rewrite the PCJ cardinality for table named '"
+                    + pcjTableName + "'. This table will not work anymore.", e);
+        } finally {
+            if(writer != null) {
+                try {
+                    writer.close();
+                } catch (final MutationsRejectedException e) {
+                    throw new PCJStorageException("Could not close the batch writer.", e);
+                }
+            }
+        }
+    }
+
+    /**
+     * Drops a PCJ index from Accumulo.
+     *
+     * @param accumuloConn - Connects to the Accumulo that hosts the PCJ indices. (not null)
+     * @param pcjTableName - The name of the PCJ table that will be dropped. (not null)
+     * @throws PCJStorageException - The table could not be dropped because of
+     *   a security exception or because it does not exist.
+     */
+    public void dropPcjTable(final Connector accumuloConn, final String pcjTableName) throws PCJStorageException {
+        checkNotNull(accumuloConn);
+        checkNotNull(pcjTableName);
+        try {
+            accumuloConn.tableOperations().delete(pcjTableName);
+        } catch (AccumuloException | AccumuloSecurityException | TableNotFoundException e) {
+            throw new PCJStorageException("Could not delete PCJ table named: " + pcjTableName, e);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/14073a23/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjVarOrderFactory.java
----------------------------------------------------------------------
diff --git a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjVarOrderFactory.java b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjVarOrderFactory.java
new file mode 100644
index 0000000..f806d6e
--- /dev/null
+++ b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjVarOrderFactory.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.storage.accumulo;
+
+import java.util.Set;
+
+/**
+ * Create alternative variable orders for a SPARQL query based on
+ * the original ordering of its results.
+ */
+public interface PcjVarOrderFactory {
+
+    /**
+     * Create alternative variable orders for a SPARQL query based on
+     * the original ordering of its results.
+     *
+     * @param varOrder - The initial variable order of a SPARQL query. (not null)
+     * @return A set of alternative variable orders for the original.
+     */
+    public Set<VariableOrder> makeVarOrders(VariableOrder varOrder);
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/14073a23/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/ShiftVarOrderFactory.java
----------------------------------------------------------------------
diff --git a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/ShiftVarOrderFactory.java b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/ShiftVarOrderFactory.java
new file mode 100644
index 0000000..1ae21e5
--- /dev/null
+++ b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/ShiftVarOrderFactory.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.storage.accumulo;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import javax.annotation.ParametersAreNonnullByDefault;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Shifts the variables to the left so that each variable will appear at
+ * the head of the varOrder once.
+ */
+@ParametersAreNonnullByDefault
+public class ShiftVarOrderFactory implements PcjVarOrderFactory {
+    @Override
+    public Set<VariableOrder> makeVarOrders(final VariableOrder varOrder) {
+        final Set<VariableOrder> varOrders = new HashSet<>();
+
+        final List<String> cyclicBuff = Lists.newArrayList( varOrder.getVariableOrders() );
+        final String[] varOrderBuff = new String[ cyclicBuff.size() ];
+
+        for(int shift = 0; shift < cyclicBuff.size(); shift++) {
+            // Build a variable order.
+            for(int i = 0; i < cyclicBuff.size(); i++) {
+                varOrderBuff[i] = cyclicBuff.get(i);
+            }
+            varOrders.add( new VariableOrder(varOrderBuff) );
+
+            // Shift the order the variables will appear in the cyclic buffer.
+            cyclicBuff.add( cyclicBuff.remove(0) );
+        }
+
+        return varOrders;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/14073a23/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/VariableOrder.java
----------------------------------------------------------------------
diff --git a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/VariableOrder.java b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/VariableOrder.java
new file mode 100644
index 0000000..ef88d8c
--- /dev/null
+++ b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/VariableOrder.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.storage.accumulo;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.Collection;
+import java.util.Iterator;
+
+import javax.annotation.ParametersAreNonnullByDefault;
+import javax.annotation.concurrent.Immutable;
+
+import org.openrdf.query.BindingSet;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.ImmutableList;
+
+/**
+ * An ordered list of {@link BindingSet} variable names. These are used to
+ * specify the order {@link Binding}s within the set are serialized to Accumulo.
+ * This order effects which rows a prefix scan will hit.
+ */
+@Immutable
+@ParametersAreNonnullByDefault
+public final class VariableOrder implements Iterable<String> {
+
+    public static final String VAR_ORDER_DELIM = ";";
+
+    private final ImmutableList<String> variableOrder;
+
+    /**
+     * Constructs an instance of {@link VariableOrder}.
+     *
+     * @param varOrder - An ordered array of Binding Set variables. (not null)
+     */
+    public VariableOrder(final String... varOrder) {
+        checkNotNull(varOrder);
+        variableOrder = ImmutableList.copyOf(varOrder);
+    }
+
+    /**
+     * Constructs an instance of {@link VariableOrdeR{.
+     *
+     * @param varOrder - An ordered collection of Binding Set variables. (not null)
+     */
+    public VariableOrder(final Collection<String> varOrder) {
+        checkNotNull(varOrder);
+        variableOrder = ImmutableList.copyOf(varOrder);
+    }
+
+    /**
+     * Constructs an instance of {@link VariableOrder}.
+     *
+     * @param varOrderString - The String representation of a VariableOrder. (not null)
+     */
+    public VariableOrder(final String varOrderString) {
+        checkNotNull(varOrderString);
+        variableOrder = ImmutableList.copyOf( varOrderString.split(VAR_ORDER_DELIM) );
+    }
+
+    /**
+     * @return And ordered list of Binding Set variables.
+     */
+    public ImmutableList<String> getVariableOrders() {
+        return variableOrder;
+    }
+
+    /**
+     * @return The variable order as an ordered array of Strings. This array is mutable.
+     */
+    public String[] toArray() {
+        final String[] array = new String[ variableOrder.size() ];
+        return variableOrder.toArray( array );
+    }
+
+    @Override
+    public String toString() {
+        return Joiner.on(VAR_ORDER_DELIM).join(variableOrder);
+    }
+
+    @Override
+    public int hashCode() {
+        return variableOrder.hashCode();
+    }
+
+    @Override
+    public boolean equals(final Object o) {
+        if(this == o) {
+            return true;
+        } else if(o instanceof VariableOrder) {
+            final VariableOrder varOrder = (VariableOrder) o;
+            return variableOrder.equals( varOrder.variableOrder );
+        }
+        return false;
+    }
+
+    @Override
+    public Iterator<String> iterator() {
+        return variableOrder.iterator();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/14073a23/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/VisibilityBindingSet.java
----------------------------------------------------------------------
diff --git a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/VisibilityBindingSet.java b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/VisibilityBindingSet.java
new file mode 100644
index 0000000..b9f4a1f
--- /dev/null
+++ b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/VisibilityBindingSet.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.storage.accumulo;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import javax.annotation.ParametersAreNonnullByDefault;
+
+import org.openrdf.query.BindingSet;
+
+/**
+ * Decorates a {@link BindingSet} with a collection of visibilities.
+ */
+@ParametersAreNonnullByDefault
+public class VisibilityBindingSet extends BindingSetDecorator {
+    private static final long serialVersionUID = 1L;
+    private final String visibility;
+    private volatile int hashCode;
+
+    /**
+     * @param set - Decorates the {@link BindingSet} with no visibilities.
+     */
+    public VisibilityBindingSet(final BindingSet set) {
+        this(set, "");
+    }
+
+    /**
+     * Creates a new {@link VisibilityBindingSet}
+     * @param set - The {@link BindingSet} to decorate
+     * @param visibility - The visibilities on the {@link BindingSet} (not null)
+     */
+    public VisibilityBindingSet(final BindingSet set, final String visibility) {
+        super(set);
+        this.visibility = checkNotNull(visibility);
+    }
+
+    /**
+     * @return - The Visibilities on the {@link BindingSet}
+     */
+    public String getVisibility() {
+        return visibility;
+    }
+
+    @Override
+    public boolean equals(final Object o) {
+        if (this == o) {
+            return true;
+        } else if(o instanceof VisibilityBindingSet) {
+            final VisibilityBindingSet other = (VisibilityBindingSet) o;
+            return set.equals(other) && visibility.equals(other.getVisibility());
+        }
+        return false;
+    }
+
+    @Override
+    public int hashCode() {
+        int result = hashCode;
+        if(result == 0) {
+            result = 31 * result + visibility.hashCode();
+            result = 31 * result + super.hashCode();
+            hashCode = result;
+        }
+        return result;
+    }
+
+    @Override
+    public String toString() {
+        final StringBuilder sb = new StringBuilder(super.toString());
+        sb.append("\n  Visibility: " + getVisibility() + "\n");
+        return sb.toString();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/14073a23/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/VisibilityBindingSetStringConverter.java
----------------------------------------------------------------------
diff --git a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/VisibilityBindingSetStringConverter.java b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/VisibilityBindingSetStringConverter.java
new file mode 100644
index 0000000..8ff01ac
--- /dev/null
+++ b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/VisibilityBindingSetStringConverter.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.storage.accumulo;
+
+import javax.annotation.ParametersAreNonnullByDefault;
+
+import org.openrdf.query.BindingSet;
+
+import com.google.common.base.Strings;
+
+/**
+ * Converts {@link BindingSet}s to Strings and back again. The Strings do not
+ * include the binding names and are ordered with a {@link VariableOrder}.
+ */
+@ParametersAreNonnullByDefault
+public class VisibilityBindingSetStringConverter extends BindingSetStringConverter {
+    public static final char VISIBILITY_DELIM = 1;
+
+    @Override
+    public String convert(final BindingSet bindingSet, final VariableOrder varOrder) {
+        String visibility = "";
+        if(bindingSet instanceof VisibilityBindingSet) {
+            final VisibilityBindingSet visiSet = (VisibilityBindingSet) bindingSet;
+            if(!Strings.isNullOrEmpty(visiSet.getVisibility())) {
+                visibility = VISIBILITY_DELIM + visiSet.getVisibility();
+            }
+        }
+        return super.convert(bindingSet, varOrder) + visibility;
+    }
+
+    @Override
+    public BindingSet convert(final String bindingSetString, final VariableOrder varOrder) {
+        final String[] visiStrings = bindingSetString.split("" + VISIBILITY_DELIM);
+        BindingSet bindingSet = super.convert(visiStrings[0], varOrder);
+
+        if(visiStrings.length > 1) {
+            bindingSet = new VisibilityBindingSet(bindingSet, visiStrings[1]);
+        } else {
+            bindingSet = new VisibilityBindingSet(bindingSet);
+        }
+        return bindingSet;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/14073a23/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/update/PrecomputedJoinUpdater.java
----------------------------------------------------------------------
diff --git a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/update/PrecomputedJoinUpdater.java b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/update/PrecomputedJoinUpdater.java
new file mode 100644
index 0000000..5cf01c5
--- /dev/null
+++ b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/update/PrecomputedJoinUpdater.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.update;
+
+import java.util.Collection;
+
+import javax.annotation.ParametersAreNonnullByDefault;
+
+import org.apache.rya.indexing.pcj.storage.PcjException;
+
+import mvm.rya.api.domain.RyaStatement;
+
+/**
+ * Updates the state of all PCJ indices whenever {@link RyaStatement}s are
+ * added to or removed from the system.
+ */
+@ParametersAreNonnullByDefault
+public interface PrecomputedJoinUpdater {
+
+    /**
+     * The PCJ indices will be updated to include new statements within
+     * their results.
+     *
+     * @param statements - The statements that will be used to updated the index. (not null)
+     * @throws PcjUpdateException The statements could not be added to the index.
+     */
+    public void addStatements(final Collection<RyaStatement> statements) throws PcjUpdateException;
+
+    /**
+     * The PCJ indices will be updated to remove any results that are
+     * derived from the provided statements.
+     * </p>
+     * A result will only be deleted from the index if all statements
+     * it is derived from are removed. For example, suppose the following
+     * instructions execute:
+     * <pre>
+     *   Insert Statement A
+     *   Insert Statement B
+     *   Insert Statement C
+     *   A and B Join to create Result A
+     *   B and C Join to create Result A again
+     *   Delete Statement A
+     * </pre>
+     * Result A will remain in the index because B and C have not been
+     * delete. However, If either B or C are deleted, then the result will
+     * also be deleted because it can no longer be derived from the remaining
+     * information.
+     *
+     * @param statements - The statements that will be used to updated the index. (not null)
+     * @throws PcjUpdateException The statements could not be removed from the index.
+     */
+    public void deleteStatements(Collection<RyaStatement> statements) throws PcjUpdateException;
+
+    /**
+     * If the updater does any batching, then this will force it to flush immediately.
+     *
+     * @throws PcjUpdateException The updater could not be flushed.
+     */
+    public void flush() throws PcjUpdateException;
+
+    /**
+     * Cleans up any resources required to perform the updates (sockets, streams, etc).
+     *
+     * @throws PcjUpdateException The updater could not be closed.
+     */
+    public void close() throws PcjUpdateException;
+
+    /**
+     * An operation of {@link PrecomputedJoinUpdater} failed.
+     */
+    public static class PcjUpdateException extends PcjException {
+        private static final long serialVersionUID = 1L;
+
+        /**
+         * Constructs a new exception with the specified detail message. The cause
+         * is not initialized, and may subsequently be initialized by a call to
+         * {@link Throwable#initCause(java.lang.Throwable)}.
+         *
+         * @param message - The detail message. The detail message is saved for
+         *   later retrieval by the {@link Throwable#getMessage()} method.
+         */
+        public PcjUpdateException(final String message) {
+            super(message);
+        }
+
+        /**
+         * Constructs a new exception with the specified detail message and cause.
+         * </p>
+         * Note that the detail message associated with cause is not automatically
+         * incorporated in this exception's detail message.
+         *
+         * @param message - The detail message (which is saved for later retrieval
+         *   by the {@link Throwable#getMessage()} method).
+         * @param cause - The cause (which is saved for later retrieval by the
+         *   {@link Throwable#getCause()} method). (A null value is permitted, and
+         *   indicates that the cause is nonexistent or unknown.)
+         */
+        public PcjUpdateException(final String message, final Throwable cause) {
+            super(message, cause);
+        }
+    }
+}
\ No newline at end of file