You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rya.apache.org by ca...@apache.org on 2017/12/05 19:49:23 UTC

[2/4] incubator-rya git commit: RYA-406. Closes #251.

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/BindingHashShardingFunction.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/BindingHashShardingFunction.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/BindingHashShardingFunction.java
new file mode 100644
index 0000000..e095309
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/BindingHashShardingFunction.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.util;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.NODEID_BS_DELIM;
+import static org.apache.rya.indexing.pcj.storage.accumulo.BindingSetStringConverter.TYPE_DELIM;
+
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.Bytes.BytesBuilder;
+import org.apache.rya.api.domain.RyaType;
+import org.apache.rya.api.resolver.RdfToRyaConversions;
+import org.apache.rya.indexing.pcj.storage.accumulo.BindingSetStringConverter;
+import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
+import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
+import org.openrdf.model.Value;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import com.google.common.hash.Hashing;
+
+/**
+ * This class adds or removes a hash to or from the rowId for sharding purposes. When creating a sharded row key, it
+ * takes the form: node_prefix:shardId:nodeId//Binding_values. For example, the row key generated from nodeId = SP_123,
+ * varOrder = a;b, bs = [a = uri:Bob, b = uri:Doug] would be SP:HASH(uri:Bob):123//uri:Bob;uri:Doug, where HASH(uri:Bob)
+ * indicates the shard id hash generated from the Binding value "uri:Bob".
+ *
+ */
+public class BindingHashShardingFunction {
+
+    private static final BindingSetStringConverter BS_CONVERTER = new BindingSetStringConverter();
+    private static final int HASH_LEN = 4;
+
+    /**
+     * Generates a sharded rowId. The rowId is of the form: node_prefix:shardId:nodeId//Binding_values. For
+     * example, the row key generated from nodeId = SP_123, varOrder = a;b, bs = [a = uri:Bob, b = uri:Doug] would be
+     * SP:HASH(uri:Bob):123//uri:Bob;uri:Doug, where HASH(uri:Bob) indicates the shard id hash generated from the
+     * Binding value "uri:Bob".
+     *
+     * @param nodeId - Node Id with type and UUID
+     * @param varOrder - VarOrder used to order BindingSet values
+     * @param bs - BindingSet with partially formed query values
+     * @return - serialized Bytes rowId for storing BindingSet results in Fluo
+     */
+    public static Bytes addShard(String nodeId, VariableOrder varOrder, VisibilityBindingSet bs) {
+        checkNotNull(nodeId);
+        checkNotNull(varOrder);
+        checkNotNull(bs);
+        String[] rowPrefixAndId = nodeId.split("_");
+        Preconditions.checkArgument(rowPrefixAndId.length == 2);
+        String prefix = rowPrefixAndId[0];
+        String id = rowPrefixAndId[1];
+
+        String firstBindingString = "";
+        Bytes rowSuffix = Bytes.of(id);
+        if (varOrder.getVariableOrders().size() > 0) {
+            VariableOrder first = new VariableOrder(varOrder.getVariableOrders().get(0));
+            firstBindingString = BS_CONVERTER.convert(bs, first);
+            rowSuffix = RowKeyUtil.makeRowKey(id, varOrder, bs);
+        }
+
+        BytesBuilder builder = Bytes.builder();
+        builder.append(Bytes.of(prefix + ":"));
+        builder.append(genHash(Bytes.of(id + NODEID_BS_DELIM + firstBindingString)));
+        builder.append(":");
+        builder.append(rowSuffix);
+        return builder.toBytes();
+    }
+
+    /**
+     * Generates a sharded rowId. The rowId is of the form: node_prefix:shardId:nodeId//Binding_values. For
+     * example, the row key generated from nodeId = SP_123, varOrder = a;b, bs = [a = uri:Bob, b = uri:Doug] would be
+     * SP:HASH(uri:Bob):123//uri:Bob;uri:Doug, where HASH(uri:Bob) indicates the shard id hash generated from the
+     * Binding value "uri:Bob".
+     *
+     * @param nodeId - Node Id with type and UUID
+     * @param firstBsVal - String representation of the first BsValue
+     * @return - serialized Bytes prefix for scanning rows
+     */
+    public static Bytes getShardedScanPrefix(String nodeId, Value firstBsVal) {
+        checkNotNull(nodeId);
+        checkNotNull(firstBsVal);
+
+        final RyaType ryaValue = RdfToRyaConversions.convertValue(firstBsVal);
+        final String bindingString = ryaValue.getData() + TYPE_DELIM + ryaValue.getDataType();
+
+        return getShardedScanPrefix(nodeId, bindingString);
+    }
+
+    /**
+     * Generates a sharded rowId from the indicated nodeId and bindingString. For example, the row key generated from
+     * nodeId = SP_123, varOrder = a;b, bs = [a = uri:Bob, b = uri:Doug] would be
+     * SP:HASH(uri:Bob):123//uri:Bob;uri:Doug, where HASH(uri:Bob) indicates the shard id hash generated from the
+     * Binding value "uri:Bob".
+     *
+     * @param nodeId - NodeId with tyep and UUID
+     * @param bindingString - String representation of BindingSet values, as formed by {@link BindingSetStringConverter}
+     * @return - serialized, sharded Bytes prefix
+     */
+    public static Bytes getShardedScanPrefix(String nodeId, String bindingString) {
+        checkNotNull(nodeId);
+        checkNotNull(bindingString);
+        String[] rowPrefixAndId = nodeId.split("_");
+        Preconditions.checkArgument(rowPrefixAndId.length == 2);
+        String prefix = rowPrefixAndId[0];
+        String id = rowPrefixAndId[1];
+
+        BytesBuilder builder = Bytes.builder();
+        builder.append(prefix + ":");
+        builder.append(genHash(Bytes.of(id + NODEID_BS_DELIM + bindingString)));
+        builder.append(":");
+        builder.append(id);
+        builder.append(NODEID_BS_DELIM);
+        builder.append(bindingString);
+        return builder.toBytes();
+    }
+
+    private static boolean hasHash(Bytes prefixBytes, Bytes row) {
+        for (int i = prefixBytes.length() + 1; i < prefixBytes.length() + HASH_LEN; i++) {
+            byte b = row.byteAt(i);
+            boolean isAlphaNum = (b >= 'a' && b <= 'z') || (b >= '0' && b <= '9');
+            if (!isAlphaNum) {
+                return false;
+            }
+        }
+
+        if (row.byteAt(prefixBytes.length()) != ':' || row.byteAt(prefixBytes.length() + HASH_LEN + 1) != ':') {
+            return false;
+        }
+
+        return true;
+    }
+
+    /**
+     * @return Returns input with prefix and hash stripped from beginning.
+     */
+    public static Bytes removeHash(Bytes prefixBytes, Bytes row) {
+        checkNotNull(prefixBytes);
+        checkNotNull(row);
+        checkArgument(row.length() >= prefixBytes.length() + 6, "Row is shorter than expected " + row);
+        checkArgument(row.subSequence(0, prefixBytes.length()).equals(prefixBytes),
+                "Row does not have expected prefix " + row);
+        checkArgument(hasHash(prefixBytes, row), "Row does not have expected hash " + row);
+
+        BytesBuilder builder = Bytes.builder();
+        builder.append(prefixBytes);
+        builder.append("_");
+        builder.append(row.subSequence(prefixBytes.length() + 6, row.length()));
+        return builder.toBytes();
+    }
+
+    private static String genHash(Bytes row) {
+        int hash = Hashing.murmur3_32().hashBytes(row.toArray()).asInt();
+        hash = hash & 0x7fffffff;
+        // base 36 gives a lot more bins in 4 bytes than hex, but it is still human readable which is
+        // nice for debugging.
+        String hashString = Strings.padStart(Integer.toString(hash, Character.MAX_RADIX), HASH_LEN, '0');
+        hashString = hashString.substring(hashString.length() - HASH_LEN);
+        return hashString;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/TriplePrefixUtils.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/TriplePrefixUtils.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/TriplePrefixUtils.java
new file mode 100644
index 0000000..d451d28
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/TriplePrefixUtils.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.util;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.NODEID_BS_DELIM;
+import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.TRIPLE_PREFIX;
+
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.Bytes.BytesBuilder;
+import org.apache.rya.api.domain.RyaStatement;
+
+/**
+ * This class is a utility class for adding and removing the Triple prefix to
+ * Statements ingested into the Rya Fluo application.  The Triple prefix is added
+ * to supported range based compactions for removing transient data from the Fluo
+ * application.  This prefix supports the Transient data recipe described on the
+ * the Fluo website, and reduces the computational load on the system by cleaning up
+ * old deleted Triples and notifications using a targeted range compaction.
+ *
+ */
+public class TriplePrefixUtils {
+
+    private static final Bytes TRIPLE_PREFIX_BYTES = Bytes.of(TRIPLE_PREFIX + NODEID_BS_DELIM);
+
+    /**
+     * Prepends the triple prefix to the provided bytes and returns the new value as a {@link Bytes}.
+     * @param tripleBytes - serialized {@link RyaStatement}
+     * @return - serialized RyaStatement with prepended triple prefix, converted to Bytes
+     */
+    public static Bytes addTriplePrefixAndConvertToBytes(byte[] tripleBytes) {
+        checkNotNull(tripleBytes);
+        BytesBuilder builder = Bytes.builder();
+        return builder.append(TRIPLE_PREFIX_BYTES).append(tripleBytes).toBytes();
+    }
+
+    /**
+     * Removes the triple prefix and returns the new value as a byte array.
+     * @param prefixedTriple - serialized RyaStatement with prepended triple prefix, converted to Bytes
+     * @return - serialized {@link RyaStatement} in byte array form
+     */
+    public static byte[] removeTriplePrefixAndConvertToByteArray(Bytes prefixedTriple) {
+        checkNotNull(prefixedTriple);
+        return prefixedTriple.subSequence(TRIPLE_PREFIX_BYTES.length(), prefixedTriple.length()).toArray();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataCacheTest.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataCacheTest.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataCacheTest.java
index 3df3708..f0faf1f 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataCacheTest.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataCacheTest.java
@@ -1,3 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
 package org.apache.rya.indexing.pcj.fluo.app.query;
 
 import static org.junit.Assert.assertEquals;

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/StatementPatternIdCacheTest.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/StatementPatternIdCacheTest.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/StatementPatternIdCacheTest.java
new file mode 100644
index 0000000..657b548
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/StatementPatternIdCacheTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.query;
+
+import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.STATEMENT_PATTERN_ID;
+import static org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns.STATEMENT_PATTERN_IDS;
+import static org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns.STATEMENT_PATTERN_IDS_HASH;
+import static org.mockito.Mockito.when;
+
+import java.util.Set;
+
+import org.apache.fluo.api.client.Transaction;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.rya.indexing.pcj.fluo.app.NodeType;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import com.google.common.collect.Sets;
+
+public class StatementPatternIdCacheTest {
+
+    @Test
+    public void testCache() {
+        Transaction mockTx = Mockito.mock(Transaction.class);
+        String nodeId = NodeType.generateNewFluoIdForType(NodeType.STATEMENT_PATTERN);
+        when(mockTx.get(Bytes.of(STATEMENT_PATTERN_ID), STATEMENT_PATTERN_IDS)).thenReturn(Bytes.of(nodeId));
+        when(mockTx.get(Bytes.of(STATEMENT_PATTERN_ID), STATEMENT_PATTERN_IDS_HASH)).thenReturn(Bytes.of("123"));
+
+        StatementPatternIdCache cache = new StatementPatternIdCache();
+        Set<String> ids1 = cache.getStatementPatternIds(mockTx);
+        Set<String> ids2 = cache.getStatementPatternIds(mockTx);
+
+        Assert.assertEquals(ids1, ids2);
+        Assert.assertEquals(Sets.newHashSet(nodeId), ids1);
+
+        Mockito.verify(mockTx, Mockito.times(1)).get(Bytes.of(STATEMENT_PATTERN_ID), STATEMENT_PATTERN_IDS);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/util/BindingHashShardingFunctionTest.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/util/BindingHashShardingFunctionTest.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/util/BindingHashShardingFunctionTest.java
new file mode 100644
index 0000000..729cdeb
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/util/BindingHashShardingFunctionTest.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.util;
+
+import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.SP_PREFIX;
+
+import org.apache.fluo.api.data.Bytes;
+import org.apache.rya.indexing.pcj.fluo.app.BindingSetRow;
+import org.apache.rya.indexing.pcj.fluo.app.NodeType;
+import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
+import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
+import org.junit.Assert;
+import org.junit.Test;
+import org.openrdf.model.ValueFactory;
+import org.openrdf.model.impl.ValueFactoryImpl;
+import org.openrdf.query.algebra.evaluation.QueryBindingSet;
+
+public class BindingHashShardingFunctionTest {
+
+    private static final ValueFactory vf = new ValueFactoryImpl();
+
+    @Test
+    public void shardAddAndRemoveTest() {
+        String nodeId = NodeType.generateNewFluoIdForType(NodeType.STATEMENT_PATTERN);
+        QueryBindingSet bs = new QueryBindingSet();
+        bs.addBinding("entity", vf.createURI("urn:entity"));
+        bs.addBinding("location", vf.createLiteral("location_1"));
+        VisibilityBindingSet vBs = new VisibilityBindingSet(bs);
+        VariableOrder varOrder = new VariableOrder("entity","location");
+        Bytes row = RowKeyUtil.makeRowKey(nodeId, varOrder, vBs);
+        Bytes shardedRow = BindingHashShardingFunction.addShard(nodeId, varOrder, vBs);
+        Bytes shardlessRow = BindingHashShardingFunction.removeHash(Bytes.of(SP_PREFIX), shardedRow);
+        Assert.assertEquals(row, shardlessRow);
+    }
+
+    @Test
+    public void bindingSetRowTest() {
+        String nodeId = NodeType.generateNewFluoIdForType(NodeType.STATEMENT_PATTERN);
+        QueryBindingSet bs = new QueryBindingSet();
+        bs.addBinding("entity", vf.createURI("urn:entity"));
+        bs.addBinding("location", vf.createLiteral("location_1"));
+        VisibilityBindingSet vBs = new VisibilityBindingSet(bs);
+        VariableOrder varOrder = new VariableOrder("entity","location");
+        Bytes row = RowKeyUtil.makeRowKey(nodeId, varOrder, vBs);
+        Bytes shardedRow = BindingHashShardingFunction.addShard(nodeId, varOrder, vBs);
+        BindingSetRow expected = BindingSetRow.make(row);
+        BindingSetRow actual = BindingSetRow.makeFromShardedRow(Bytes.of(SP_PREFIX), shardedRow);
+        Assert.assertEquals(expected, actual);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/util/TriplePrefixUtilsTest.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/util/TriplePrefixUtilsTest.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/util/TriplePrefixUtilsTest.java
new file mode 100644
index 0000000..ac41dc8
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/util/TriplePrefixUtilsTest.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.util;
+
+import java.util.Arrays;
+
+import org.apache.fluo.api.data.Bytes;
+import org.apache.rya.api.resolver.triple.TripleRowResolverException;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TriplePrefixUtilsTest {
+
+    @Test
+    public void testAddRemovePrefix() throws TripleRowResolverException {
+        byte[] expected = Bytes.of("triple").toArray();
+        Bytes fluoBytes = TriplePrefixUtils.addTriplePrefixAndConvertToBytes(expected);
+        byte[] returned = TriplePrefixUtils.removeTriplePrefixAndConvertToByteArray(fluoBytes);
+        Assert.assertEquals(true, Arrays.equals(expected, returned));
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.integration/pom.xml
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/pom.xml b/extras/rya.pcj.fluo/pcj.fluo.integration/pom.xml
index c038dbe..e594474 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.integration/pom.xml
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/pom.xml
@@ -1,14 +1,21 @@
-<?xml version="1.0" encoding="utf-8"?>
-<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor 
-    license agreements. See the NOTICE file distributed with this work for additional 
-    information regarding copyright ownership. The ASF licenses this file to 
-    you under the Apache License, Version 2.0 (the "License"); you may not use 
-    this file except in compliance with the License. You may obtain a copy of 
-    the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required 
-    by applicable law or agreed to in writing, software distributed under the 
-    License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS 
-    OF ANY KIND, either express or implied. See the License for the specific 
-    language governing permissions and limitations under the License. -->
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
 
     <parent>

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/BatchIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/BatchIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/BatchIT.java
index 66aa04b..7e882e9 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/BatchIT.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/BatchIT.java
@@ -41,7 +41,7 @@ import org.apache.rya.api.domain.RyaStatement;
 import org.apache.rya.api.domain.RyaURI;
 import org.apache.rya.indexing.pcj.fluo.api.CreateFluoPcj;
 import org.apache.rya.indexing.pcj.fluo.api.InsertTriples;
-import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants;
+import org.apache.rya.indexing.pcj.fluo.app.BindingSetRow;
 import org.apache.rya.indexing.pcj.fluo.app.JoinResultUpdater.Side;
 import org.apache.rya.indexing.pcj.fluo.app.NodeType;
 import org.apache.rya.indexing.pcj.fluo.app.batch.BatchInformation;
@@ -54,13 +54,16 @@ import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
 import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO;
 import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata.JoinType;
 import org.apache.rya.indexing.pcj.fluo.app.query.UnsupportedQueryException;
+import org.apache.rya.indexing.pcj.fluo.app.util.BindingHashShardingFunction;
 import org.apache.rya.indexing.pcj.fluo.app.util.FluoQueryUtils;
 import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
 import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage;
 import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
 import org.apache.rya.pcj.fluo.test.base.RyaExportITBase;
 import org.junit.Test;
-import org.openrdf.model.impl.URIImpl;
+import org.openrdf.model.URI;
+import org.openrdf.model.ValueFactory;
+import org.openrdf.model.impl.ValueFactoryImpl;
 import org.openrdf.query.algebra.evaluation.QueryBindingSet;
 
 import com.google.common.base.Optional;
@@ -70,6 +73,7 @@ public class BatchIT extends RyaExportITBase {
 
     private static final Logger log = Logger.getLogger(BatchIT.class);
     private static final FluoQueryMetadataDAO dao = new FluoQueryMetadataDAO();
+    private static final ValueFactory vf = new ValueFactoryImpl();
 
     @Test
     public void simpleScanDelete() throws Exception {
@@ -89,16 +93,16 @@ public class BatchIT extends RyaExportITBase {
             final String pcjId = pcjStorage.createPcj(sparql);
 
             // Tell the Fluo app to maintain the PCJ.
-            String queryId = new CreateFluoPcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, getAccumuloConnector(),
-                    getRyaInstanceName()).getQueryId();
+            String queryId = new CreateFluoPcj()
+                    .withRyaIntegration(pcjId, pcjStorage, fluoClient, getAccumuloConnector(), getRyaInstanceName()).getQueryId();
 
             List<String> ids = getNodeIdStrings(fluoClient, queryId);
             List<String> prefixes = Arrays.asList("urn:subject_1", "urn:subject_1", "urn:object", "urn:subject_1", "urn:subject_1");
 
             // Stream the data into Fluo.
             InsertTriples inserter = new InsertTriples();
-            inserter.insert(fluoClient, statements1, Optional.<String> absent());
-            inserter.insert(fluoClient, statements2, Optional.<String> absent());
+            inserter.insert(fluoClient, statements1, Optional.absent());
+            inserter.insert(fluoClient, statements2, Optional.absent());
 
             // Verify the end results of the query match the expected results.
             getMiniFluo().waitForObservers();
@@ -129,22 +133,26 @@ public class BatchIT extends RyaExportITBase {
             final String pcjId = pcjStorage.createPcj(sparql);
 
             // Tell the Fluo app to maintain the PCJ.
-            String queryId = new CreateFluoPcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, getAccumuloConnector(),
-                    getRyaInstanceName()).getQueryId();
+            String queryId = new CreateFluoPcj()
+                    .withRyaIntegration(pcjId, pcjStorage, fluoClient, getAccumuloConnector(), getRyaInstanceName()).getQueryId();
 
             List<String> ids = getNodeIdStrings(fluoClient, queryId);
             String joinId = ids.get(2);
             String rightSp = ids.get(4);
             QueryBindingSet bs = new QueryBindingSet();
-            bs.addBinding("subject", new URIImpl("urn:subject_1"));
-            bs.addBinding("object1", new URIImpl("urn:object_0"));
+            bs.addBinding("subject", vf.createURI("urn:subject_1"));
+            bs.addBinding("object1", vf.createURI("urn:object_0"));
             VisibilityBindingSet vBs = new VisibilityBindingSet(bs);
-            Span span = Span.prefix(Bytes.of(rightSp + IncrementalUpdateConstants.NODEID_BS_DELIM + "urn:subject_1"));
+
+            //create sharded span for deletion
+            URI uri = vf.createURI("urn:subject_1");
+            Bytes prefixBytes = BindingHashShardingFunction.getShardedScanPrefix(rightSp, uri);
+            Span span = Span.prefix(prefixBytes);
 
             // Stream the data into Fluo.
             InsertTriples inserter = new InsertTriples();
-            inserter.insert(fluoClient, statements1, Optional.<String> absent());
-            inserter.insert(fluoClient, statements2, Optional.<String> absent());
+            inserter.insert(fluoClient, statements1, Optional.absent());
+            inserter.insert(fluoClient, statements2, Optional.absent());
 
             getMiniFluo().waitForObservers();
             verifyCounts(fluoClient, ids, Arrays.asList(25, 25, 25, 5, 5));
@@ -175,21 +183,24 @@ public class BatchIT extends RyaExportITBase {
             final String pcjId = pcjStorage.createPcj(sparql);
 
             // Tell the Fluo app to maintain the PCJ.
-            String queryId = new CreateFluoPcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, getAccumuloConnector(),
-                    getRyaInstanceName()).getQueryId();
+            String queryId = new CreateFluoPcj()
+                    .withRyaIntegration(pcjId, pcjStorage, fluoClient, getAccumuloConnector(), getRyaInstanceName()).getQueryId();
 
             List<String> ids = getNodeIdStrings(fluoClient, queryId);
             String joinId = ids.get(2);
             String rightSp = ids.get(4);
             QueryBindingSet bs = new QueryBindingSet();
-            bs.addBinding("subject", new URIImpl("urn:subject_1"));
-            bs.addBinding("object1", new URIImpl("urn:object_0"));
+            bs.addBinding("subject", vf.createURI("urn:subject_1"));
+            bs.addBinding("object1", vf.createURI("urn:object_0"));
             VisibilityBindingSet vBs = new VisibilityBindingSet(bs);
-            Span span = Span.prefix(Bytes.of(rightSp + IncrementalUpdateConstants.NODEID_BS_DELIM + "urn:subject_1"));
+
+            URI uri = vf.createURI("urn:subject_1");
+            Bytes prefixBytes = BindingHashShardingFunction.getShardedScanPrefix(rightSp, uri);
+            Span span = Span.prefix(prefixBytes);
 
             // Stream the data into Fluo.
             InsertTriples inserter = new InsertTriples();
-            inserter.insert(fluoClient, statements2, Optional.<String> absent());
+            inserter.insert(fluoClient, statements2, Optional.absent());
 
             getMiniFluo().waitForObservers();
             verifyCounts(fluoClient, ids, Arrays.asList(0, 0, 0, 0, 5));
@@ -214,7 +225,7 @@ public class BatchIT extends RyaExportITBase {
             RyaURI subj = new RyaURI("urn:subject_1");
             RyaStatement statement1 = new RyaStatement(subj, new RyaURI("urn:predicate_1"), null);
             RyaStatement statement2 = new RyaStatement(subj, new RyaURI("urn:predicate_2"), null);
-            
+
             Set<RyaStatement> statements1 = getRyaStatements(statement1, 15);
             Set<RyaStatement> statements2 = getRyaStatements(statement2, 15);
 
@@ -224,22 +235,21 @@ public class BatchIT extends RyaExportITBase {
 
             // Tell the Fluo app to maintain the PCJ and sets batch scan size for StatementPatterns to 5 and
             // batch size of joins to 5.
-            String queryId = new CreateFluoPcj(5, 5).withRyaIntegration(pcjId, pcjStorage, fluoClient, getAccumuloConnector(),
-                    getRyaInstanceName()).getQueryId();
+            String queryId = new CreateFluoPcj(5, 5)
+                    .withRyaIntegration(pcjId, pcjStorage, fluoClient, getAccumuloConnector(), getRyaInstanceName()).getQueryId();
 
             List<String> ids = getNodeIdStrings(fluoClient, queryId);
 
             // Stream the data into Fluo.
             InsertTriples inserter = new InsertTriples();
-            inserter.insert(fluoClient, statements1, Optional.<String> absent());
-            inserter.insert(fluoClient, statements2, Optional.<String> absent());
+            inserter.insert(fluoClient, statements1, Optional.absent());
+            inserter.insert(fluoClient, statements2, Optional.absent());
 
             getMiniFluo().waitForObservers();
             verifyCounts(fluoClient, ids, Arrays.asList(225, 225, 225, 15, 15));
         }
     }
-    
-    
+
     @Test
     public void leftJoinBatchIntegrationTest() throws Exception {
         final String sparql = "SELECT ?subject ?object1 ?object2 WHERE { ?subject <urn:predicate_1> ?object1; "
@@ -249,10 +259,10 @@ public class BatchIT extends RyaExportITBase {
             RyaURI subj = new RyaURI("urn:subject_1");
             RyaStatement statement1 = new RyaStatement(subj, new RyaURI("urn:predicate_1"), null);
             RyaStatement statement2 = new RyaStatement(subj, new RyaURI("urn:predicate_2"), null);
-            
+
             subj = new RyaURI("urn:subject_2");
             RyaStatement statement3 = new RyaStatement(subj, new RyaURI("urn:predicate_1"), null);
-            
+
             Set<RyaStatement> statements1 = getRyaStatements(statement1, 10);
             Set<RyaStatement> statements2 = getRyaStatements(statement2, 10);
             Set<RyaStatement> statements3 = getRyaStatements(statement3, 10);
@@ -263,37 +273,35 @@ public class BatchIT extends RyaExportITBase {
 
             // Tell the Fluo app to maintain the PCJ and sets batch scan size for StatementPatterns to 5 and
             // batch size of joins to 5.
-            String queryId = new CreateFluoPcj(5, 5).withRyaIntegration(pcjId, pcjStorage, fluoClient, getAccumuloConnector(),
-                    getRyaInstanceName()).getQueryId();
+            String queryId = new CreateFluoPcj(5, 5)
+                    .withRyaIntegration(pcjId, pcjStorage, fluoClient, getAccumuloConnector(), getRyaInstanceName()).getQueryId();
 
             List<String> ids = getNodeIdStrings(fluoClient, queryId);
 
             // Stream the data into Fluo.
             InsertTriples inserter = new InsertTriples();
-            inserter.insert(fluoClient, statements1, Optional.<String> absent());
-            inserter.insert(fluoClient, statements2, Optional.<String> absent());
-            inserter.insert(fluoClient, statements3, Optional.<String> absent());
+            inserter.insert(fluoClient, statements1, Optional.absent());
+            inserter.insert(fluoClient, statements2, Optional.absent());
+            inserter.insert(fluoClient, statements3, Optional.absent());
 
             getMiniFluo().waitForObservers();
             verifyCounts(fluoClient, ids, Arrays.asList(110, 110, 110, 20, 10));
         }
     }
-    
-    
+
     @Test
     public void multiJoinBatchIntegrationTest() throws Exception {
         final String sparql = "SELECT ?subject1 ?subject2 ?object1 ?object2 WHERE { ?subject1 <urn:predicate_1> ?object1; "
-                + " <urn:predicate_2> ?object2 ."
-                + " ?subject2 <urn:predicate_3> ?object2 } ";
+                + " <urn:predicate_2> ?object2 ." + " ?subject2 <urn:predicate_3> ?object2 } ";
         try (FluoClient fluoClient = new FluoClientImpl(getFluoConfiguration())) {
 
             RyaURI subj1 = new RyaURI("urn:subject_1");
             RyaStatement statement1 = new RyaStatement(subj1, new RyaURI("urn:predicate_1"), null);
             RyaStatement statement2 = new RyaStatement(subj1, new RyaURI("urn:predicate_2"), null);
-            
+
             Set<RyaStatement> statements1 = getRyaStatements(statement1, 10);
             Set<RyaStatement> statements2 = getRyaStatements(statement2, 10);
-            
+
             RyaURI subj2 = new RyaURI("urn:subject_2");
             RyaStatement statement3 = new RyaStatement(subj2, new RyaURI("urn:predicate_3"), null);
             Set<RyaStatement> statements3 = getRyaStatements(statement3, 10);
@@ -304,23 +312,22 @@ public class BatchIT extends RyaExportITBase {
 
             // Tell the Fluo app to maintain the PCJ and sets batch scan size for StatementPatterns to 5 and
             // batch size of joins to 5.
-            String queryId = new CreateFluoPcj(5, 5).withRyaIntegration(pcjId, pcjStorage, fluoClient, getAccumuloConnector(),
-                    getRyaInstanceName()).getQueryId();
+            String queryId = new CreateFluoPcj(5, 5)
+                    .withRyaIntegration(pcjId, pcjStorage, fluoClient, getAccumuloConnector(), getRyaInstanceName()).getQueryId();
 
             List<String> ids = getNodeIdStrings(fluoClient, queryId);
 
             // Stream the data into Fluo.
             InsertTriples inserter = new InsertTriples();
-            inserter.insert(fluoClient, statements1, Optional.<String> absent());
-            inserter.insert(fluoClient, statements2, Optional.<String> absent());
-            inserter.insert(fluoClient, statements3, Optional.<String> absent());
+            inserter.insert(fluoClient, statements1, Optional.absent());
+            inserter.insert(fluoClient, statements2, Optional.absent());
+            inserter.insert(fluoClient, statements3, Optional.absent());
 
             getMiniFluo().waitForObservers();
             verifyCounts(fluoClient, ids, Arrays.asList(100, 100, 100, 100, 10, 10, 10));
         }
     }
 
-
     private Set<RyaStatement> getRyaStatements(RyaStatement statement, int numTriples) {
 
         Set<RyaStatement> statements = new HashSet<>();
@@ -361,13 +368,19 @@ public class BatchIT extends RyaExportITBase {
             for (int i = 0; i < ids.size(); i++) {
                 String id = ids.get(i);
                 String bsPrefix = prefixes.get(i);
+                URI uri = vf.createURI(bsPrefix);
+                Bytes prefixBytes = BindingHashShardingFunction.getShardedScanPrefix(id, uri);
                 NodeType type = NodeType.fromNodeId(id).get();
                 Column bsCol = type.getResultColumn();
-                String row = id + IncrementalUpdateConstants.NODEID_BS_DELIM + bsPrefix;
-                Span span = Span.prefix(Bytes.of(row));
-                BatchInformation batch = SpanBatchDeleteInformation.builder().setBatchSize(batchSize).setColumn(bsCol).setSpan(span)
-                        .build();
-                BatchInformationDAO.addBatch(tx, id, batch);
+                SpanBatchDeleteInformation.Builder builder = SpanBatchDeleteInformation.builder().setBatchSize(batchSize)
+                        .setColumn(bsCol);
+                if (type == NodeType.JOIN) {
+                    builder.setSpan(Span.prefix(type.getNodeTypePrefix()));
+                    builder.setNodeId(java.util.Optional.of(id));
+                } else {
+                    builder.setSpan(Span.prefix(prefixBytes));
+                }
+                BatchInformationDAO.addBatch(tx, id, builder.build());
             }
             tx.commit();
         }
@@ -383,14 +396,19 @@ public class BatchIT extends RyaExportITBase {
     private int countResults(FluoClient fluoClient, String nodeId, Column bsColumn) {
         try (Transaction tx = fluoClient.newTransaction()) {
             int count = 0;
-            RowScanner scanner = tx.scanner().over(Span.prefix(nodeId)).fetch(bsColumn).byRow().build();
+            Optional<NodeType> type = NodeType.fromNodeId(nodeId);
+            Bytes prefixBytes = Bytes.of(type.get().getNodeTypePrefix());
+            RowScanner scanner = tx.scanner().over(Span.prefix(prefixBytes)).fetch(bsColumn).byRow().build();
             Iterator<ColumnScanner> colScanners = scanner.iterator();
             while (colScanners.hasNext()) {
                 ColumnScanner colScanner = colScanners.next();
-                Iterator<ColumnValue> vals = colScanner.iterator();
-                while (vals.hasNext()) {
-                    vals.next();
-                    count++;
+                BindingSetRow bsRow = BindingSetRow.makeFromShardedRow(prefixBytes, colScanner.getRow());
+                if (bsRow.getNodeId().equals(nodeId)) {
+                    Iterator<ColumnValue> vals = colScanner.iterator();
+                    while (vals.hasNext()) {
+                        vals.next();
+                        count++;
+                    }
                 }
             }
             tx.commit();
@@ -405,7 +423,6 @@ public class BatchIT extends RyaExportITBase {
             int expected = expectedCounts.get(i);
             NodeType type = NodeType.fromNodeId(id).get();
             int count = countResults(fluoClient, id, type.getResultColumn());
-            log.trace("NodeId: " + id + " Count: " + count + " Expected: " + expected);
             switch (type) {
             case STATEMENT_PATTERN:
                 assertEquals(expected, count);

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeleteIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeleteIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeleteIT.java
index 27b8222..ef5ab34 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeleteIT.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeleteIT.java
@@ -76,17 +76,18 @@ public class CreateDeleteIT extends RyaExportITBase {
         // Create the PCJ in Fluo and load the statements into Rya.
         final String pcjId = loadData(sparql, statements);
 
-        try(FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) {
+        try(FluoClient fluoClient = FluoFactory.newClient(getFluoConfiguration())) {
             // Ensure the data was loaded.
             final List<Bytes> rows = getFluoTableEntries(fluoClient);
-            assertEquals(18, rows.size());
+            assertEquals(19, rows.size());
 
             // Delete the PCJ from the Fluo application.
             new DeleteFluoPcj(1).deletePcj(fluoClient, pcjId);
+            getMiniFluo().waitForObservers();
 
             // Ensure all data related to the query has been removed.
             final List<Bytes> empty_rows = getFluoTableEntries(fluoClient);
-            assertEquals(0, empty_rows.size());
+            assertEquals(1, empty_rows.size());
         }
     }
 
@@ -108,20 +109,21 @@ public class CreateDeleteIT extends RyaExportITBase {
         // Create the PCJ in Fluo and load the statements into Rya.
         final String pcjId = loadData(sparql, statements);
 
-        try(FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) {
+        try(FluoClient fluoClient = FluoFactory.newClient(getFluoConfiguration())) {
             // Ensure the data was loaded.
             final List<Bytes> rows = getFluoTableEntries(fluoClient);
-            assertEquals(10, rows.size());
+            assertEquals(11, rows.size());
 
             // Delete the PCJ from the Fluo application.
             new DeleteFluoPcj(1).deletePcj(fluoClient, pcjId);
+            getMiniFluo().waitForObservers();
 
             // Ensure all data related to the query has been removed.
             final List<Bytes> empty_rows = getFluoTableEntries(fluoClient);
-            assertEquals(0, empty_rows.size());
+            assertEquals(1, empty_rows.size());
         }
     }
-    
+
 
     private String loadData(final String sparql, final Collection<Statement> statements) throws Exception {
         requireNonNull(sparql);
@@ -133,14 +135,14 @@ public class CreateDeleteIT extends RyaExportITBase {
         final String pcjId = ryaClient.getCreatePCJ().createPCJ(getRyaInstanceName(), sparql, Sets.newHashSet());
 
         // Write the data to Rya.
-        final SailRepositoryConnection ryaConn = super.getRyaSailRepository().getConnection();
+        final SailRepositoryConnection ryaConn = getRyaSailRepository().getConnection();
         ryaConn.begin();
         ryaConn.add(statements);
         ryaConn.commit();
         ryaConn.close();
 
         // Wait for the Fluo application to finish computing the end result.
-        super.getMiniFluo().waitForObservers();
+        getMiniFluo().waitForObservers();
 
         // The PCJ Id is the topic name the results will be written to.
         return pcjId;

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeletePeriodicPCJ.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeletePeriodicPCJ.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeletePeriodicPCJ.java
index e61104a..eb21b34 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeletePeriodicPCJ.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeletePeriodicPCJ.java
@@ -109,11 +109,11 @@ public class CreateDeletePeriodicPCJ extends KafkaExportITBase {
                         vf.createLiteral(dtf.newXMLGregorianCalendar(time4))),
                 vf.createStatement(vf.createURI("urn:obs_4"), vf.createURI("uri:hasId"), vf.createLiteral("id_4")));
 
-        runTest(query, statements, 29);
+        runTest(query, statements, 30);
 
     }
 
-   
+
 
     private void runTest(String query, Collection<Statement> statements, int expectedEntries) throws Exception {
         try (FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) {
@@ -134,10 +134,11 @@ public class CreateDeletePeriodicPCJ extends KafkaExportITBase {
 
             DeletePeriodicQuery deletePeriodic = new DeletePeriodicQuery(fluoClient, storage);
             deletePeriodic.deletePeriodicQuery(FluoQueryUtils.convertFluoQueryIdToPcjId(id), notificationClient);
+            getMiniFluo().waitForObservers();
 
             // Ensure all data related to the query has been removed.
             final List<Bytes> empty_rows = getFluoTableEntries(fluoClient);
-            assertEquals(0, empty_rows.size());
+            assertEquals(1, empty_rows.size());
 
             // Ensure that Periodic Service notified to add and delete PeriodicNotification
             Set<CommandNotification> notifications;

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/FluoLatencyIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/FluoLatencyIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/FluoLatencyIT.java
deleted file mode 100644
index fabf512..0000000
--- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/FluoLatencyIT.java
+++ /dev/null
@@ -1,169 +0,0 @@
-package org.apache.rya.indexing.pcj.fluo.integration;
-
-import static java.util.Objects.requireNonNull;
-
-import java.time.ZonedDateTime;
-import java.time.format.DateTimeFormatter;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import javax.xml.datatype.DatatypeConfigurationException;
-import javax.xml.datatype.DatatypeFactory;
-
-import org.apache.accumulo.core.client.Scanner;
-import org.apache.accumulo.core.client.TableNotFoundException;
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.security.Authorizations;
-import org.apache.fluo.api.client.FluoClient;
-import org.apache.fluo.api.client.FluoFactory;
-import org.apache.fluo.api.client.Transaction;
-import org.apache.fluo.api.client.scanner.CellScanner;
-import org.apache.fluo.api.config.FluoConfiguration;
-import org.apache.fluo.api.data.RowColumnValue;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.rya.api.client.CreatePCJ.ExportStrategy;
-import org.apache.rya.indexing.pcj.fluo.api.CreateFluoPcj;
-import org.apache.rya.indexing.pcj.fluo.app.util.FluoQueryUtils;
-import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
-import org.apache.rya.pcj.fluo.test.base.KafkaExportITBase;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.openrdf.model.Literal;
-import org.openrdf.model.Statement;
-import org.openrdf.model.ValueFactory;
-import org.openrdf.model.impl.ValueFactoryImpl;
-import org.openrdf.repository.sail.SailRepositoryConnection;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-
-public class FluoLatencyIT extends KafkaExportITBase {
-    private static ValueFactory vf;
-    private static DatatypeFactory dtf;
-
-    @BeforeClass
-    public static void init() throws DatatypeConfigurationException {
-        vf = new ValueFactoryImpl();
-        dtf = DatatypeFactory.newInstance();
-    }
-
-    @Test
-    public void resultsExported() throws Exception {
-
-        final String sparql = "prefix time: <http://www.w3.org/2006/time#> " + "select ?type (count(?obs) as ?total) where { "
-                + "    ?obs <uri:hasTime> ?time. " + "    ?obs <uri:hasObsType> ?type " + "} " + "group by ?type";
-
-//        final String sparql = "prefix time: <http://www.w3.org/2006/time#> " + "select ?type ?obs where { "
-//                + "    ?obs <uri:hasTime> ?time. " + "    ?obs <uri:hasObsType> ?type " + "}";
-
-        try (FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) {
-            // Tell the Fluo app to maintain the PCJ.
-            String pcjId = FluoQueryUtils.createNewPcjId();
-            FluoConfiguration conf = super.getFluoConfiguration();
-            new CreateFluoPcj().createPcj(pcjId, sparql, Sets.newHashSet(ExportStrategy.KAFKA), fluoClient);
-            SailRepositoryConnection conn = super.getRyaSailRepository().getConnection();
-
-            long start = System.currentTimeMillis();
-            int numReturned = 0;
-            int numObs = 10;
-            int numTypes = 5;
-            int numExpected = 0;
-            int increment = numObs*numTypes;
-            while (System.currentTimeMillis() - start < 60000) {
-                List<Statement> statements = generate(10, 5, "car_", numExpected, ZonedDateTime.now());
-                conn.add(statements);
-                numExpected += increment;
-                System.out.println("Num Accumulo Entries: " + getNumAccEntries(conf.getAccumuloTable()) + " Num Fluo Entries: "
-                        + getNumFluoEntries(fluoClient));
-                numReturned += readAllResults(pcjId).size();
-                System.out
-                        .println("Expected: " + numExpected + " NumReturned: " + numReturned + " Difference: " + (numExpected - numReturned));
-//                FluoITHelper.printFluoTable(conf);
-                Thread.sleep(30000);
-            }
-        }
-    }
-
-    /**
-     * Generates (numObservationsPerType x numTypes) statements of the form:
-     *
-     * <pre>
-     * urn:obs_n uri:hasTime zonedTime
-     * urn:obs_n uri:hasObsType typePrefix_m
-     * </pre>
-     *
-     * Where the n in urn:obs_n is the ith value in 0 to (numObservationsPerType x numTypes) with an offset specified by
-     * observationOffset, and where m in typePrefix_m is the jth value in 0 to numTypes.
-     *
-     * @param numObservationsPerType - The quantity of observations per type to generate.
-     * @param numTypes - The number of types to generate observations for.
-     * @param typePrefix - The prefix to be used for the type literal in the statement.
-     * @param observationOffset - The offset to be used for determining the value of n in the above statements.
-     * @param zonedTime - The time to be used for all observations generated.
-     * @return A new list of all generated Statements.
-     */
-    public List<Statement> generate(final long numObservationsPerType, final int numTypes, final String typePrefix,
-            final long observationOffset, final ZonedDateTime zonedTime) {
-        final String time = zonedTime.format(DateTimeFormatter.ISO_INSTANT);
-        final Literal litTime = vf.createLiteral(dtf.newXMLGregorianCalendar(time));
-        final List<Statement> statements = Lists.newArrayList();
-
-        for (long i = 0; i < numObservationsPerType; i++) {
-            for (int j = 0; j < numTypes; j++) {
-                final long observationId = observationOffset + i * numTypes + j;
-                // final String obsId = "urn:obs_" + Long.toHexString(observationId) + "_" + observationId;
-                // final String obsId = "urn:obs_" + observationId;
-                final String obsId = "urn:obs_" + String.format("%020d", observationId);
-                final String type = typePrefix + j;
-                // logger.info(obsId + " " + type + " " + litTime);
-                statements.add(vf.createStatement(vf.createURI(obsId), vf.createURI("uri:hasTime"), litTime));
-                statements.add(vf.createStatement(vf.createURI(obsId), vf.createURI("uri:hasObsType"), vf.createLiteral(type)));
-            }
-        }
-
-        return statements;
-    }
-
-    private Set<VisibilityBindingSet> readAllResults(final String pcjId) throws Exception {
-        requireNonNull(pcjId);
-
-        // Read all of the results from the Kafka topic.
-        final Set<VisibilityBindingSet> results = new HashSet<>();
-
-        try (final KafkaConsumer<String, VisibilityBindingSet> consumer = makeConsumer(pcjId)) {
-            final ConsumerRecords<String, VisibilityBindingSet> records = consumer.poll(5000);
-            final Iterator<ConsumerRecord<String, VisibilityBindingSet>> recordIterator = records.iterator();
-            while (recordIterator.hasNext()) {
-                results.add(recordIterator.next().value());
-            }
-        }
-
-        return results;
-    }
-
-    private int getNumAccEntries(String tableName) throws TableNotFoundException {
-        Scanner scanner = super.getAccumuloConnector().createScanner(tableName, new Authorizations());
-        int count = 0;
-        for (Map.Entry<Key, Value> entry : scanner) {
-            count++;
-        }
-        return count;
-    }
-
-    private int getNumFluoEntries(FluoClient client) {
-        Transaction tx = client.newTransaction();
-        CellScanner scanner = tx.scanner().build();
-        int count = 0;
-        for (RowColumnValue rcv : scanner) {
-            count++;
-        }
-        return count;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/QueryIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/QueryIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/QueryIT.java
index beaef32..2b87a97 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/QueryIT.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/QueryIT.java
@@ -123,9 +123,8 @@ public class QueryIT extends RyaExportITBase {
         // and are skilled with computers. The resulting binding set includes everybody who
         // was involved in the recruitment process.
         final String sparql = "SELECT ?recruiter ?candidate ?leader " + "{ " + "?recruiter <http://recruiterFor> <http://GeekSquad>. "
-                + "?candidate <http://skilledWith> <http://Computers>. " + "?candidate <http://livesIn> \"USA\". "
-                + "?leader <http://leaderOf> <http://GeekSquad>. " + "?recruiter <http://talksTo> ?candidate. "
-                + "?candidate <http://talksTo> ?leader. " + "}";
+                + "?recruiter <http://talksTo> ?candidate. " + "?candidate <http://skilledWith> <http://Computers>. " + "?candidate <http://livesIn> \"USA\". "
+                + "?candidate <http://talksTo> ?leader." + "?leader <http://leaderOf> <http://GeekSquad>. }";
 
         // Create the Statements that will be loaded into Rya.
         final ValueFactory vf = new ValueFactoryImpl();
@@ -426,10 +425,10 @@ public class QueryIT extends RyaExportITBase {
         runTest(sparql, statements, expectedResults, ExportStrategy.RYA);
     }
 
-    
+
     @Test
     public void dateTimeWithin() throws Exception {
-        
+
         final ValueFactory vf = new ValueFactoryImpl();
         DatatypeFactory dtf = DatatypeFactory.newInstance();
         FunctionRegistry.getInstance().add(new DateTimeWithinPeriod());
@@ -437,13 +436,13 @@ public class QueryIT extends RyaExportITBase {
         final String sparql = "PREFIX fn: <" + FN.NAMESPACE +">"
                 + "SELECT ?event ?startTime ?endTime WHERE { ?event <uri:startTime> ?startTime; <uri:endTime> ?endTime. "
                 + "FILTER(fn:dateTimeWithin(?startTime, ?endTime, 2,<" + OWLTime.HOURS_URI + "> ))}";
-        
+
         ZonedDateTime zTime = ZonedDateTime.now();
         String time = zTime.format(DateTimeFormatter.ISO_INSTANT);
 
         ZonedDateTime zTime1 = zTime.minusHours(1);
         String time1 = zTime1.format(DateTimeFormatter.ISO_INSTANT);
-        
+
         ZonedDateTime zTime2 = zTime.minusHours(2);
         String time2 = zTime2.format(DateTimeFormatter.ISO_INSTANT);
 
@@ -471,10 +470,10 @@ public class QueryIT extends RyaExportITBase {
         // Verify the end results of the query match the expected results.
         runTest(sparql, statements, expectedResults, ExportStrategy.RYA);
     }
-    
+
     @Test
     public void dateTimeWithinNow() throws Exception {
-        
+
         final ValueFactory vf = new ValueFactoryImpl();
         DatatypeFactory dtf = DatatypeFactory.newInstance();
         FunctionRegistry.getInstance().add(new DateTimeWithinPeriod());
@@ -482,13 +481,13 @@ public class QueryIT extends RyaExportITBase {
         final String sparql = "PREFIX fn: <" + FN.NAMESPACE +">"
                 + "SELECT ?event ?startTime WHERE { ?event <uri:startTime> ?startTime. "
                 + "FILTER(fn:dateTimeWithin(?startTime, NOW(), 15, <" + OWLTime.SECONDS_URI + "> ))}";
-        
+
         ZonedDateTime zTime = ZonedDateTime.now();
         String time = zTime.format(DateTimeFormatter.ISO_INSTANT);
 
         ZonedDateTime zTime1 = zTime.minusSeconds(30);
         String time1 = zTime1.format(DateTimeFormatter.ISO_INSTANT);
-        
+
         Literal lit = vf.createLiteral(dtf.newXMLGregorianCalendar(time));
         Literal lit1 = vf.createLiteral(dtf.newXMLGregorianCalendar(time1));
 
@@ -511,7 +510,7 @@ public class QueryIT extends RyaExportITBase {
     }
 
 
-    
+
     @Test
     public void periodicQueryTestWithoutAggregation() throws Exception {
         String query = "prefix function: <http://org.apache.rya/function#> " // n
@@ -800,8 +799,8 @@ public class QueryIT extends RyaExportITBase {
         // Verify the end results of the query match the expected results.
         runTest(query, statements, expectedResults, ExportStrategy.PERIODIC);
     }
-    
-    
+
+
     @Test
     public void nestedPeriodicQueryTestWithAggregationAndGroupBy() throws Exception {
         String query = "prefix function: <http://org.apache.rya/function#> " // n
@@ -879,7 +878,7 @@ public class QueryIT extends RyaExportITBase {
         // Verify the end results of the query match the expected results.
         runTest(query, statements, expectedResults, ExportStrategy.PERIODIC);
     }
-    
+
     @Test
     public void nestedJoinPeriodicQueryWithAggregationAndGroupBy() throws Exception {
         String query = "prefix function: <http://org.apache.rya/function#> " // n
@@ -1006,6 +1005,7 @@ public class QueryIT extends RyaExportITBase {
                 // Ensure the result of the query matches the expected result.
                 assertEquals(expectedResults, results);
             }
+
             break;
         case PERIODIC:
             PeriodicQueryResultStorage periodicStorage = new AccumuloPeriodicQueryResultStorage(accumuloConn, getRyaInstanceName());
@@ -1014,7 +1014,7 @@ public class QueryIT extends RyaExportITBase {
                 new CreateFluoPcj().createPcj(periodicId, sparql, Sets.newHashSet(ExportStrategy.PERIODIC), fluo);
             }
             addStatementsAndWait(statements);
-            
+
             final Set<BindingSet> results = Sets.newHashSet();
             try (CloseableIterator<BindingSet> resultIter = periodicStorage.listResults(periodicId, Optional.empty())) {
                 while (resultIter.hasNext()) {

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/StatementPatternIdCacheIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/StatementPatternIdCacheIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/StatementPatternIdCacheIT.java
new file mode 100644
index 0000000..91c9977
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/StatementPatternIdCacheIT.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.integration;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.fluo.api.client.FluoClient;
+import org.apache.fluo.api.client.FluoFactory;
+import org.apache.rya.indexing.pcj.fluo.api.CreateFluoPcj;
+import org.apache.rya.indexing.pcj.fluo.app.query.FluoQuery;
+import org.apache.rya.indexing.pcj.fluo.app.query.StatementPatternIdCache;
+import org.apache.rya.indexing.pcj.fluo.app.query.StatementPatternMetadata;
+import org.apache.rya.indexing.pcj.fluo.app.util.FluoQueryUtils;
+import org.apache.rya.pcj.fluo.test.base.RyaExportITBase;
+import org.junit.Test;
+
+import com.google.common.collect.Sets;
+
+public class StatementPatternIdCacheIT extends RyaExportITBase {
+
+    /**
+     * Ensure streamed matches are included in the result.
+     */
+    @Test
+    public void statementPatternIdCacheTest() throws Exception {
+        // A query that finds people who talk to Eve and work at Chipotle.
+        final String sparql1 =
+              "SELECT ?x WHERE { " +
+                "?x <urn:pred1> <urn:obj1>. " +
+                "?x <urn:pred2> <urn:obj2>." +
+              "}";
+
+        final String sparql2 =
+                "SELECT ?x WHERE { " +
+                  "?x <urn:pred3> <urn:obj3>. " +
+                  "?x <urn:pred4> <urn:obj4>." +
+                "}";
+
+        try(FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) {
+
+            String pcjId = FluoQueryUtils.createNewPcjId();
+            // Tell the Fluo app to maintain the PCJ.
+            FluoQuery query1 = new CreateFluoPcj().createPcj(pcjId, sparql1, new HashSet<>(), fluoClient);
+            Set<String> spIds1 = new HashSet<>();
+            for(StatementPatternMetadata metadata: query1.getStatementPatternMetadata()) {
+                spIds1.add(metadata.getNodeId());
+            }
+
+            StatementPatternIdCache cache = new StatementPatternIdCache();
+
+            assertEquals(spIds1, cache.getStatementPatternIds(fluoClient.newTransaction()));
+
+            FluoQuery query2 = new CreateFluoPcj().createPcj(pcjId, sparql2, new HashSet<>(), fluoClient);
+            Set<String> spIds2 = new HashSet<>();
+            for(StatementPatternMetadata metadata: query2.getStatementPatternMetadata()) {
+                spIds2.add(metadata.getNodeId());
+            }
+
+            assertEquals(Sets.union(spIds1, spIds2), cache.getStatementPatternIds(fluoClient.newTransaction()));
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/resources/log4j.properties b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/resources/log4j.properties
new file mode 100644
index 0000000..a5086ee
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/resources/log4j.properties
@@ -0,0 +1,43 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# Valid levels:
+# TRACE, DEBUG, INFO, WARN, ERROR and FATAL
+log4j.rootLogger=INFO, CONSOLE
+
+# Set independent logging levels
+log4j.logger.org.apache.zookeeper=WARN
+log4j.logger.kafka=WARN
+log4j.logger.org.apache.kafka=WARN
+#log4j.logger.org.apache.rya.indexing.pcj.fluo=DEBUG
+
+# LOGFILE is set to be a File appender using a PatternLayout.
+log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
+#log4j.appender.CONSOLE.Threshold=DEBUG
+
+log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
+log4j.appender.CONSOLE.layout.ConversionPattern=%d [%t] %-5p %c - %m%n
+
+#log4j.appender.default.file=org.apache.log4j.FileAppender
+#log4j.appender.default.file.file=/home/cloudera/Desktop/log.out
+#log4j.appender.default.file.layout=org.apache.log4j.PatternLayout
+#log4j.appender.default.file.layout.ConversionPattern=%d [%t] %-5p %c - %m%n
+
+#log4j.appender.CONSOLE.layout=org.apache.log4j.EnhancedPatternLayout
+#log4j.appender.CONSOLE.layout.ConversionPattern=%d [%t] %-5p %c{1.} - %m%n
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/pcj/fluo/test/base/FluoITBase.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/pcj/fluo/test/base/FluoITBase.java b/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/pcj/fluo/test/base/FluoITBase.java
index 32ee962..48334d0 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/pcj/fluo/test/base/FluoITBase.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/pcj/fluo/test/base/FluoITBase.java
@@ -48,41 +48,42 @@ import org.apache.accumulo.core.client.Instance;
 import org.apache.accumulo.core.client.ZooKeeperInstance;
 import org.apache.accumulo.core.client.security.tokens.PasswordToken;
 import org.apache.accumulo.minicluster.MiniAccumuloCluster;
-import org.apache.log4j.Level;
-import org.apache.log4j.Logger;
-import org.apache.rya.accumulo.MiniAccumuloClusterInstance;
-import org.apache.rya.accumulo.MiniAccumuloSingleton;
-import org.apache.rya.accumulo.RyaTestInstanceRule;
-import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails;
-import org.apache.rya.api.client.accumulo.AccumuloInstall;
-import org.apache.zookeeper.ClientCnxn;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.openrdf.repository.RepositoryConnection;
-import org.openrdf.repository.RepositoryException;
-import org.openrdf.sail.Sail;
-import org.openrdf.sail.SailException;
-
 import org.apache.fluo.api.client.FluoAdmin;
 import org.apache.fluo.api.client.FluoAdmin.AlreadyInitializedException;
 import org.apache.fluo.api.client.FluoClient;
 import org.apache.fluo.api.client.FluoFactory;
 import org.apache.fluo.api.config.FluoConfiguration;
 import org.apache.fluo.api.mini.MiniFluo;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
 import org.apache.rya.accumulo.AccumuloRdfConfiguration;
-import org.apache.rya.api.client.RyaClientException;
+import org.apache.rya.accumulo.MiniAccumuloClusterInstance;
+import org.apache.rya.accumulo.MiniAccumuloSingleton;
+import org.apache.rya.accumulo.RyaTestInstanceRule;
 import org.apache.rya.api.client.Install;
 import org.apache.rya.api.client.Install.DuplicateInstanceNameException;
 import org.apache.rya.api.client.Install.InstallConfiguration;
+import org.apache.rya.api.client.RyaClientException;
+import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails;
+import org.apache.rya.api.client.accumulo.AccumuloInstall;
 import org.apache.rya.api.instance.RyaDetailsRepository.RyaDetailsRepositoryException;
 import org.apache.rya.api.persist.RyaDAOException;
 import org.apache.rya.indexing.accumulo.ConfigUtils;
 import org.apache.rya.indexing.external.PrecomputedJoinIndexerConfig;
+import org.apache.rya.indexing.pcj.fluo.app.query.MetadataCacheSupplier;
+import org.apache.rya.indexing.pcj.fluo.app.query.StatementPatternIdCacheSupplier;
 import org.apache.rya.rdftriplestore.RyaSailRepository;
 import org.apache.rya.rdftriplestore.inference.InferenceEngineException;
 import org.apache.rya.sail.config.RyaSailFactory;
+import org.apache.zookeeper.ClientCnxn;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.openrdf.repository.RepositoryConnection;
+import org.openrdf.repository.RepositoryException;
+import org.openrdf.sail.Sail;
+import org.openrdf.sail.SailException;
 
 /**
  * Integration tests that ensure the Fluo application processes PCJs results
@@ -156,6 +157,7 @@ public abstract class FluoITBase {
             } catch (final Exception e) {
                 log.error("Could not shut down the Rya Connection.", e);
             }
+
         }
 
         if (ryaRepo != null) {
@@ -187,6 +189,9 @@ public abstract class FluoITBase {
                 log.error("Could not shut down the Mini Fluo.", e);
             }
         }
+
+        StatementPatternIdCacheSupplier.clear();
+        MetadataCacheSupplier.clear();
     }
 
     protected void preFluoInitHook() throws Exception {

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/pcj/fluo/test/base/KafkaExportITBase.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/pcj/fluo/test/base/KafkaExportITBase.java b/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/pcj/fluo/test/base/KafkaExportITBase.java
index 7b16dcf..ca17b2a 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/pcj/fluo/test/base/KafkaExportITBase.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/pcj/fluo/test/base/KafkaExportITBase.java
@@ -53,6 +53,7 @@ import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails;
 import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory;
 import org.apache.rya.indexing.accumulo.ConfigUtils;
 import org.apache.rya.indexing.external.PrecomputedJoinIndexerConfig;
+import org.apache.rya.indexing.pcj.fluo.app.batch.BatchObserver;
 import org.apache.rya.indexing.pcj.fluo.app.export.kafka.KafkaBindingSetExporterParameters;
 import org.apache.rya.indexing.pcj.fluo.app.export.kafka.KafkaSubGraphExporterParameters;
 import org.apache.rya.indexing.pcj.fluo.app.export.kafka.KryoVisibilityBindingSetSerializer;
@@ -64,11 +65,14 @@ import org.apache.rya.indexing.pcj.fluo.app.observers.ProjectionObserver;
 import org.apache.rya.indexing.pcj.fluo.app.observers.QueryResultObserver;
 import org.apache.rya.indexing.pcj.fluo.app.observers.StatementPatternObserver;
 import org.apache.rya.indexing.pcj.fluo.app.observers.TripleObserver;
+import org.apache.rya.indexing.pcj.fluo.app.query.MetadataCacheSupplier;
+import org.apache.rya.indexing.pcj.fluo.app.query.StatementPatternIdCacheSupplier;
 import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
 import org.apache.rya.rdftriplestore.RyaSailRepository;
 import org.apache.rya.sail.config.RyaSailFactory;
 import org.junit.After;
 import org.junit.Before;
+import org.junit.Test;
 import org.openrdf.model.Statement;
 import org.openrdf.repository.sail.SailRepositoryConnection;
 import org.openrdf.sail.Sail;
@@ -115,6 +119,7 @@ public class KafkaExportITBase extends AccumuloExportITBase {
         // Setup the observers that will be used by the Fluo PCJ Application.
         final List<ObserverSpecification> observers = new ArrayList<>();
         observers.add(new ObserverSpecification(TripleObserver.class.getName()));
+        observers.add(new ObserverSpecification(BatchObserver.class.getName()));
         observers.add(new ObserverSpecification(StatementPatternObserver.class.getName()));
         observers.add(new ObserverSpecification(JoinObserver.class.getName()));
         observers.add(new ObserverSpecification(FilterObserver.class.getName()));
@@ -185,6 +190,12 @@ public class KafkaExportITBase extends AccumuloExportITBase {
         }
     }
 
+    @After
+    public void clearCaches() {
+        StatementPatternIdCacheSupplier.clear();
+        MetadataCacheSupplier.clear();
+    }
+
     private void installRyaInstance() throws Exception {
         final MiniAccumuloCluster cluster = super.getMiniAccumuloCluster();
         final String instanceName = cluster.getInstanceName();
@@ -261,7 +272,7 @@ public class KafkaExportITBase extends AccumuloExportITBase {
      * If this test fails then its a testing environment issue, not with Rya.
      * Source: https://github.com/asmaier/mini-kafka
      */
-//    @Test
+    @Test
     public void embeddedKafkaTest() throws Exception {
         // create topic
         final String topic = "testTopic";

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/rya.pcj.functions.geo/src/test/java/org/apache/rya/indexing/pcj/functions/geo/GeoFunctionsIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/rya.pcj.functions.geo/src/test/java/org/apache/rya/indexing/pcj/functions/geo/GeoFunctionsIT.java b/extras/rya.pcj.fluo/rya.pcj.functions.geo/src/test/java/org/apache/rya/indexing/pcj/functions/geo/GeoFunctionsIT.java
index 970cfd8..f540a2e 100644
--- a/extras/rya.pcj.fluo/rya.pcj.functions.geo/src/test/java/org/apache/rya/indexing/pcj/functions/geo/GeoFunctionsIT.java
+++ b/extras/rya.pcj.fluo/rya.pcj.functions.geo/src/test/java/org/apache/rya/indexing/pcj/functions/geo/GeoFunctionsIT.java
@@ -132,6 +132,8 @@ public class GeoFunctionsIT extends RyaExportITBase {
                 "PREFIX uom: <http://www.opengis.net/def/uom/OGC/1.0/> " +
                 "SELECT ?cityA ?cityB " +
                 "WHERE { " +
+                    "?cityA <urn:containedIn> ?continent. " +
+                    "?cityB <urn:containedIn> ?continent. " +
                     "?cityA geo:asWKT ?coord1 . " +
                     "?cityB geo:asWKT ?coord2 . " +
                     // from brussels 173km to amsterdam
@@ -147,9 +149,13 @@ public class GeoFunctionsIT extends RyaExportITBase {
                 vf.createStatement(vf.createURI("tag:rya.apache.org,2017:ex#dakar2"), asWKT, vf.createLiteral("Point(-17.45 14.69)", wktTypeUri)),
                 vf.createStatement(vf.createURI("tag:rya.apache.org,2017:ex#canberra"), asWKT, vf.createLiteral("Point(149.12 -35.31)", wktTypeUri)),
                 vf.createStatement(vf.createURI("tag:rya.apache.org,2017:ex#brussels"), asWKT, vf.createLiteral("Point(4.35 50.85)", wktTypeUri)),
-                vf.createStatement(vf.createURI("tag:rya.apache.org,2017:ex#amsterdam"), asWKT, vf.createLiteral("Point(4.9 52.37)", wktTypeUri)));
+                vf.createStatement(vf.createURI("tag:rya.apache.org,2017:ex#amsterdam"), asWKT, vf.createLiteral("Point(4.9 52.37)", wktTypeUri)),
+                vf.createStatement(vf.createURI("tag:rya.apache.org,2017:ex#amsterdam"), vf.createURI("urn:containedIn"), vf.createLiteral("Europe")),
+                vf.createStatement(vf.createURI("tag:rya.apache.org,2017:ex#dakar"), vf.createURI("urn:containedIn"), vf.createLiteral("Africa")),
+                vf.createStatement(vf.createURI("tag:rya.apache.org,2017:ex#dakar2"), vf.createURI("urn:containedIn"), vf.createLiteral("Africa")),
+                vf.createStatement(vf.createURI("tag:rya.apache.org,2017:ex#brussels"), vf.createURI("urn:containedIn"), vf.createLiteral("Europe")));
 
-        // The expected results of the SPARQL query once the PCJ has been computed.
+        // The expected results of the SPARQL query once the PCJ has been computed.l
         final Set<BindingSet> expectedResults = new HashSet<>();
 
         MapBindingSet bs = new MapBindingSet();
@@ -234,6 +240,8 @@ public class GeoFunctionsIT extends RyaExportITBase {
                 "PREFIX geof: <http://www.opengis.net/def/function/geosparql/> "  +
                 "PREFIX uom: <http://www.opengis.net/def/uom/OGC/1.0/> "  +
                 "SELECT ?cityA ?cityB { "  +
+                    "?cityA <urn:containedIn> ?continent. " +
+                    "?cityB <urn:containedIn> ?continent. " +
                     "?cityA geo:asWKT ?coord1 . " +
                     "?cityB geo:asWKT ?coord2 . " +
                     " FILTER ( geof:sfIntersects(?coord1, ?coord2) ) " +
@@ -248,6 +256,8 @@ public class GeoFunctionsIT extends RyaExportITBase {
                 vf.createStatement(vf.createURI("tag:rya.apache.org,2017:ex#canberra"), asWKT, vf.createLiteral("Point(149.12 -35.31)", wktTypeUri)),
                 vf.createStatement(vf.createURI("tag:rya.apache.org,2017:ex#brussels"), asWKT, vf.createLiteral("Point(4.35 50.85)", wktTypeUri)),
                 vf.createStatement(vf.createURI("tag:rya.apache.org,2017:ex#amsterdam"), asWKT, vf.createLiteral("Point(4.9 52.37)", wktTypeUri)),
+                vf.createStatement(vf.createURI("tag:rya.apache.org,2017:ex#amsterdam"), vf.createURI("urn:containedIn"), vf.createLiteral("Europe")),
+                vf.createStatement(vf.createURI("tag:rya.apache.org,2017:ex#amsterdam2"), vf.createURI("urn:containedIn"), vf.createLiteral("Europe")),
                 vf.createStatement(vf.createURI("tag:rya.apache.org,2017:ex#amsterdam2"), asWKT, vf.createLiteral("Point(4.9 52.37)", wktTypeUri)));
 
         // The expected results of the SPARQL query once the PCJ has been computed.