You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rya.apache.org by ca...@apache.org on 2018/01/09 21:49:06 UTC

[49/50] [abbrv] incubator-rya git commit: RYA-377 Code review.

RYA-377 Code review.


Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/f0725df5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/f0725df5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/f0725df5

Branch: refs/heads/master
Commit: f0725df5921a45adc37bc5cf73b5f70dfd886ac5
Parents: f365521
Author: kchilton2 <ke...@gmail.com>
Authored: Mon Jan 8 16:41:30 2018 -0500
Committer: caleb <ca...@parsons.com>
Committed: Tue Jan 9 15:13:02 2018 -0500

----------------------------------------------------------------------
 common/rya.api.function/pom.xml                 |  4 +-
 .../function/aggregation/AverageFunction.java   |  3 +
 .../api/function/aggregation/CountFunction.java |  3 +
 .../api/function/aggregation/MaxFunction.java   |  3 +
 .../api/function/aggregation/MinFunction.java   |  3 +
 .../api/function/aggregation/SumFunction.java   |  3 +
 .../apache/rya/api/utils/CloseableIterator.java | 31 ++++++++
 .../accumulo/AccumuloBatchUpdatePCJIT.java      |  2 +-
 .../TimestampedNotificationProcessor.java       |  2 +-
 .../PeriodicNotificationApplicationIT.java      |  2 +-
 .../pruner/PeriodicNotificationBinPrunerIT.java | 14 ++--
 .../pcj/storage/PeriodicQueryResultStorage.java | 28 +++----
 .../pcj/storage/PrecomputedJoinStorage.java     | 11 +--
 .../storage/accumulo/AccumuloPcjStorage.java    |  1 +
 .../AccumuloPeriodicQueryResultStorage.java     |  2 +-
 .../AccumuloValueBindingSetIterator.java        |  2 +-
 .../pcj/storage/accumulo/PcjTables.java         |  3 +-
 .../accumulo/ScannerBindingSetIterator.java     |  2 +-
 .../pcj/storage/accumulo/PcjTablesIT.java       |  4 +-
 .../integration/AccumuloPcjStorageIT.java       |  2 +-
 .../AccumuloPeriodicQueryResultStorageIT.java   | 84 ++++++++++----------
 extras/rya.pcj.fluo/pcj.fluo.app/pom.xml        |  2 +-
 .../pcj/fluo/demo/FluoAndHistoricPcjsDemo.java  |  2 +-
 .../indexing/pcj/fluo/integration/InputIT.java  |  2 +-
 .../indexing/pcj/fluo/integration/QueryIT.java  |  2 +-
 .../RyaInputIncrementalUpdateIT.java            |  2 +-
 .../pcj/fluo/integration/StreamingTestIT.java   |  2 +-
 .../pcj/fluo/visibility/PcjVisibilityIT.java    |  2 +-
 extras/rya.streams/api/pom.xml                  |  5 --
 extras/rya.streams/kafka/pom.xml                |  2 +-
 .../kafka/processors/ProcessorResult.java       |  2 +-
 .../processors/join/CloseableIterator.java      | 32 --------
 .../processors/join/JoinProcessorSupplier.java  |  3 +-
 .../kafka/processors/join/JoinStateStore.java   |  1 +
 .../processors/join/KeyValueJoinStateStore.java |  1 +
 extras/rya.streams/pom.xml                      | 10 ++-
 pom.xml                                         |  2 +-
 37 files changed, 146 insertions(+), 135 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/f0725df5/common/rya.api.function/pom.xml
----------------------------------------------------------------------
diff --git a/common/rya.api.function/pom.xml b/common/rya.api.function/pom.xml
index ce88e36..5b7ee0a 100644
--- a/common/rya.api.function/pom.xml
+++ b/common/rya.api.function/pom.xml
@@ -27,8 +27,8 @@ under the License.
         <version>3.2.12-incubating-SNAPSHOT</version>
     </parent>
 
-    <artifactId>rya.api.function</artifactId>
-    <name>Apache Rya Common API - Functions</name>
+    <artifactId>rya.api.evaluation</artifactId>
+    <name>Apache Rya Common API - Evaluation Functions</name>
 
     <dependencies>
         <!-- Rya dependencies. -->        

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/f0725df5/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/AverageFunction.java
----------------------------------------------------------------------
diff --git a/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/AverageFunction.java b/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/AverageFunction.java
index a73d5ac..4a31fce 100644
--- a/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/AverageFunction.java
+++ b/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/AverageFunction.java
@@ -19,6 +19,7 @@
 package org.apache.rya.api.function.aggregation;
 
 import static com.google.common.base.Preconditions.checkArgument;
+import static java.util.Objects.requireNonNull;
 
 import java.math.BigDecimal;
 import java.math.BigInteger;
@@ -51,6 +52,8 @@ public final class AverageFunction implements AggregationFunction {
     @Override
     public void update(final AggregationElement aggregation, final AggregationState state, final VisibilityBindingSet childBindingSet) {
         checkArgument(aggregation.getAggregationType() == AggregationType.AVERAGE, "The AverageFunction only accepts AVERAGE AggregationElements.");
+        requireNonNull(state);
+        requireNonNull(childBindingSet);
 
         // Only update the average if the child contains the binding that we are averaging.
         final String aggregatedName = aggregation.getAggregatedBindingName();

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/f0725df5/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/CountFunction.java
----------------------------------------------------------------------
diff --git a/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/CountFunction.java b/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/CountFunction.java
index 7dd5b21..879df5e 100644
--- a/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/CountFunction.java
+++ b/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/CountFunction.java
@@ -19,6 +19,7 @@
 package org.apache.rya.api.function.aggregation;
 
 import static com.google.common.base.Preconditions.checkArgument;
+import static java.util.Objects.requireNonNull;
 
 import java.math.BigInteger;
 
@@ -39,6 +40,8 @@ public final class CountFunction implements AggregationFunction {
     @Override
     public void update(final AggregationElement aggregation, final AggregationState state, final VisibilityBindingSet childBindingSet) {
         checkArgument(aggregation.getAggregationType() == AggregationType.COUNT, "The CountFunction only accepts COUNT AggregationElements.");
+        requireNonNull(state);
+        requireNonNull(childBindingSet);
 
         // Only add one to the count if the child contains the binding that we are counting.
         final String aggregatedName = aggregation.getAggregatedBindingName();

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/f0725df5/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/MaxFunction.java
----------------------------------------------------------------------
diff --git a/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/MaxFunction.java b/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/MaxFunction.java
index 3295fbb..5b5d493 100644
--- a/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/MaxFunction.java
+++ b/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/MaxFunction.java
@@ -19,6 +19,7 @@
 package org.apache.rya.api.function.aggregation;
 
 import static com.google.common.base.Preconditions.checkArgument;
+import static java.util.Objects.requireNonNull;
 
 import org.apache.rya.api.model.VisibilityBindingSet;
 import org.openrdf.model.Value;
@@ -40,6 +41,8 @@ public final class MaxFunction implements AggregationFunction {
     @Override
     public void update(final AggregationElement aggregation, final AggregationState state, final VisibilityBindingSet childBindingSet) {
         checkArgument(aggregation.getAggregationType() == AggregationType.MAX, "The MaxFunction only accepts MAX AggregationElements.");
+        requireNonNull(state);
+        requireNonNull(childBindingSet);
 
         // Only update the max if the child contains the binding that we are finding the max value for.
         final String aggregatedName = aggregation.getAggregatedBindingName();

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/f0725df5/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/MinFunction.java
----------------------------------------------------------------------
diff --git a/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/MinFunction.java b/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/MinFunction.java
index d6bf751..f1b083c 100644
--- a/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/MinFunction.java
+++ b/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/MinFunction.java
@@ -19,6 +19,7 @@
 package org.apache.rya.api.function.aggregation;
 
 import static com.google.common.base.Preconditions.checkArgument;
+import static java.util.Objects.requireNonNull;
 
 import org.apache.rya.api.model.VisibilityBindingSet;
 import org.openrdf.model.Value;
@@ -40,6 +41,8 @@ public final class MinFunction implements AggregationFunction {
     @Override
     public void update(final AggregationElement aggregation, final AggregationState state, final VisibilityBindingSet childBindingSet) {
         checkArgument(aggregation.getAggregationType() == AggregationType.MIN, "The MinFunction only accepts MIN AggregationElements.");
+        requireNonNull(state);
+        requireNonNull(childBindingSet);
 
         // Only update the min if the child contains the binding that we are finding the min value for.
         final String aggregatedName = aggregation.getAggregatedBindingName();

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/f0725df5/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/SumFunction.java
----------------------------------------------------------------------
diff --git a/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/SumFunction.java b/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/SumFunction.java
index 97735f2..7ddc9ae 100644
--- a/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/SumFunction.java
+++ b/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/SumFunction.java
@@ -19,6 +19,7 @@
 package org.apache.rya.api.function.aggregation;
 
 import static com.google.common.base.Preconditions.checkArgument;
+import static java.util.Objects.requireNonNull;
 
 import java.math.BigInteger;
 
@@ -48,6 +49,8 @@ public final class SumFunction implements AggregationFunction {
     @Override
     public void update(final AggregationElement aggregation, final AggregationState state, final VisibilityBindingSet childBindingSet) {
         checkArgument(aggregation.getAggregationType() == AggregationType.SUM, "The SumFunction only accepts SUM AggregationElements.");
+        requireNonNull(state);
+        requireNonNull(childBindingSet);
 
         // Only add values to the sum if the child contains the binding that we are summing.
         final String aggregatedName = aggregation.getAggregatedBindingName();

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/f0725df5/common/rya.api/src/main/java/org/apache/rya/api/utils/CloseableIterator.java
----------------------------------------------------------------------
diff --git a/common/rya.api/src/main/java/org/apache/rya/api/utils/CloseableIterator.java b/common/rya.api/src/main/java/org/apache/rya/api/utils/CloseableIterator.java
new file mode 100644
index 0000000..c29f5e0
--- /dev/null
+++ b/common/rya.api/src/main/java/org/apache/rya/api/utils/CloseableIterator.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.api.utils;
+
+import java.util.Iterator;
+
+/**
+ * An {@link Iterator} that also extends {@link AutoCloseable} because it has reference to resources
+ * that need to be released once you are done iterating.
+ *
+ * @param <T> The type of object that is iterated over.
+ */
+public interface CloseableIterator<T> extends Iterator<T>, AutoCloseable {
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/f0725df5/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/AccumuloBatchUpdatePCJIT.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/AccumuloBatchUpdatePCJIT.java b/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/AccumuloBatchUpdatePCJIT.java
index 40941c8..5028454 100644
--- a/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/AccumuloBatchUpdatePCJIT.java
+++ b/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/AccumuloBatchUpdatePCJIT.java
@@ -27,11 +27,11 @@ import org.apache.rya.accumulo.AccumuloITBase;
 import org.apache.rya.accumulo.AccumuloRdfConfiguration;
 import org.apache.rya.api.client.Install.InstallConfiguration;
 import org.apache.rya.api.client.RyaClient;
+import org.apache.rya.api.utils.CloseableIterator;
 import org.apache.rya.indexing.accumulo.ConfigUtils;
 import org.apache.rya.indexing.external.PrecomputedJoinIndexerConfig.PrecomputedJoinStorageType;
 import org.apache.rya.indexing.external.PrecomputedJoinIndexerConfig.PrecomputedJoinUpdaterType;
 import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
-import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator;
 import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage;
 import org.apache.rya.sail.config.RyaSailFactory;
 import org.junit.Test;

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/f0725df5/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/processor/TimestampedNotificationProcessor.java
----------------------------------------------------------------------
diff --git a/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/processor/TimestampedNotificationProcessor.java b/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/processor/TimestampedNotificationProcessor.java
index ae586da..dcc47b6 100644
--- a/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/processor/TimestampedNotificationProcessor.java
+++ b/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/processor/TimestampedNotificationProcessor.java
@@ -22,8 +22,8 @@ import java.util.Optional;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import org.apache.rya.api.utils.CloseableIterator;
 import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage;
-import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator;
 import org.apache.rya.periodic.notification.api.BinPruner;
 import org.apache.rya.periodic.notification.api.BindingSetRecord;
 import org.apache.rya.periodic.notification.api.NodeBin;

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/f0725df5/extras/periodic.notification/tests/src/test/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplicationIT.java
----------------------------------------------------------------------
diff --git a/extras/periodic.notification/tests/src/test/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplicationIT.java b/extras/periodic.notification/tests/src/test/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplicationIT.java
index 92e3276..cd06f2a 100644
--- a/extras/periodic.notification/tests/src/test/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplicationIT.java
+++ b/extras/periodic.notification/tests/src/test/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplicationIT.java
@@ -54,6 +54,7 @@ import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.rya.api.resolver.RdfToRyaConversions;
+import org.apache.rya.api.utils.CloseableIterator;
 import org.apache.rya.indexing.accumulo.ConfigUtils;
 import org.apache.rya.indexing.pcj.fluo.api.CreatePeriodicQuery;
 import org.apache.rya.indexing.pcj.fluo.api.InsertTriples;
@@ -61,7 +62,6 @@ import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants;
 import org.apache.rya.indexing.pcj.fluo.app.util.FluoClientFactory;
 import org.apache.rya.indexing.pcj.fluo.app.util.FluoQueryUtils;
 import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage;
-import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator;
 import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPeriodicQueryResultStorage;
 import org.apache.rya.pcj.fluo.test.base.RyaExportITBase;
 import org.apache.rya.periodic.notification.notification.CommandNotification;

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/f0725df5/extras/periodic.notification/tests/src/test/java/org/apache/rya/periodic/notification/pruner/PeriodicNotificationBinPrunerIT.java
----------------------------------------------------------------------
diff --git a/extras/periodic.notification/tests/src/test/java/org/apache/rya/periodic/notification/pruner/PeriodicNotificationBinPrunerIT.java b/extras/periodic.notification/tests/src/test/java/org/apache/rya/periodic/notification/pruner/PeriodicNotificationBinPrunerIT.java
index 830fa46..ac2202c 100644
--- a/extras/periodic.notification/tests/src/test/java/org/apache/rya/periodic/notification/pruner/PeriodicNotificationBinPrunerIT.java
+++ b/extras/periodic.notification/tests/src/test/java/org/apache/rya/periodic/notification/pruner/PeriodicNotificationBinPrunerIT.java
@@ -39,6 +39,7 @@ import org.apache.fluo.api.data.ColumnValue;
 import org.apache.fluo.api.data.Span;
 import org.apache.fluo.core.client.FluoClientImpl;
 import org.apache.rya.api.resolver.RdfToRyaConversions;
+import org.apache.rya.api.utils.CloseableIterator;
 import org.apache.rya.indexing.pcj.fluo.api.CreatePeriodicQuery;
 import org.apache.rya.indexing.pcj.fluo.api.InsertTriples;
 import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants;
@@ -48,7 +49,6 @@ import org.apache.rya.indexing.pcj.fluo.app.util.PeriodicQueryUtil;
 import org.apache.rya.indexing.pcj.fluo.app.util.RowKeyUtil;
 import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage;
 import org.apache.rya.indexing.pcj.storage.PeriodicQueryStorageException;
-import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator;
 import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPeriodicQueryResultStorage;
 import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
 import org.apache.rya.pcj.fluo.test.base.RyaExportITBase;
@@ -68,7 +68,7 @@ import com.google.common.collect.Sets;
 
 public class PeriodicNotificationBinPrunerIT extends RyaExportITBase {
 
-    
+
     @Test
     public void periodicPrunerTest() throws Exception {
 
@@ -238,7 +238,7 @@ public class PeriodicNotificationBinPrunerIT extends RyaExportITBase {
         pruner.stop();
 
     }
-    
+
     private void compareResults(PeriodicQueryResultStorage periodicStorage, String queryId, long bin, Set<BindingSet> expected) throws PeriodicQueryStorageException, Exception {
         try(CloseableIterator<BindingSet> iter = periodicStorage.listResults(queryId, Optional.of(bin))) {
             Set<BindingSet> actual = new HashSet<>();
@@ -248,13 +248,13 @@ public class PeriodicNotificationBinPrunerIT extends RyaExportITBase {
             Assert.assertEquals(expected, actual);
         }
     }
-    
+
     private void compareFluoCounts(FluoClient client, String pcjId, long bin) {
         QueryBindingSet bs = new QueryBindingSet();
         bs.addBinding(IncrementalUpdateConstants.PERIODIC_BIN_ID, new LiteralImpl(Long.toString(bin), XMLSchema.LONG));
-        
+
         VariableOrder varOrder = new VariableOrder(IncrementalUpdateConstants.PERIODIC_BIN_ID);
-        
+
         try(Snapshot sx = client.newSnapshot()) {
             String fluoQueryId = NodeType.generateNewIdForType(NodeType.QUERY, pcjId);
             Set<String> ids = new HashSet<>();
@@ -279,5 +279,5 @@ public class PeriodicNotificationBinPrunerIT extends RyaExportITBase {
             }
         }
     }
-    
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/f0725df5/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/PeriodicQueryResultStorage.java
----------------------------------------------------------------------
diff --git a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/PeriodicQueryResultStorage.java b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/PeriodicQueryResultStorage.java
index 6637dde..2936738 100644
--- a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/PeriodicQueryResultStorage.java
+++ b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/PeriodicQueryResultStorage.java
@@ -23,7 +23,7 @@ import java.util.List;
 import java.util.Optional;
 
 import org.apache.rya.api.model.VisibilityBindingSet;
-import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator;
+import org.apache.rya.api.utils.CloseableIterator;
 import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
 import org.openrdf.query.BindingSet;
 
@@ -32,7 +32,7 @@ import org.openrdf.query.BindingSet;
  *
  */
 public interface PeriodicQueryResultStorage {
-    
+
     /**
      * Binding name for the periodic bin id
      */
@@ -45,27 +45,27 @@ public interface PeriodicQueryResultStorage {
      * @throws PeriodicQueryStorageException
      */
     public String createPeriodicQuery(String sparql) throws PeriodicQueryStorageException;
-    
+
     /**
      * Creates a PeriodicQuery result storage layer for the given SPARQL query with the given id
      * @param queryId - id of the storage layer for the given SPARQL query
      * @param sparql - SPARQL query whose periodic results will be stored
-     * @return - id of the storage layer 
+     * @return - id of the storage layer
      * @throws PeriodicQueryStorageException
      */
     public String createPeriodicQuery(String queryId, String sparql) throws PeriodicQueryStorageException;
-    
+
     /**
      * Creates a PeriodicQuery result storage layer for the given SPARQL query with the given id
      * whose results are written in the order indicated by the specified VariableOrder.
      * @param queryId - id of the storage layer for the given SPARQL query
      * @param sparql - SPARQL query whose periodic results will be stored
      * @param varOrder - VariableOrder indicating the order that results will be written in
-     * @return - id of the storage layer 
+     * @return - id of the storage layer
      * @throws PeriodicQueryStorageException
      */
     public void createPeriodicQuery(String queryId, String sparql, VariableOrder varOrder) throws PeriodicQueryStorageException;
-    
+
     /**
      * Retrieve the {@link PeriodicQueryStorageMetdata} for the give query id
      * @param queryID - id of the query whose metadata will be returned
@@ -73,7 +73,7 @@ public interface PeriodicQueryResultStorage {
      * @throws PeriodicQueryStorageException
      */
     public PeriodicQueryStorageMetadata getPeriodicQueryMetadata(String queryID) throws PeriodicQueryStorageException;;
-    
+
     /**
      * Add periodic query results to the storage layer indicated by the given query id
      * @param queryId - id indicating the storage layer that results will be added to
@@ -81,7 +81,7 @@ public interface PeriodicQueryResultStorage {
      * @throws PeriodicQueryStorageException
      */
     public void addPeriodicQueryResults(String queryId, Collection<VisibilityBindingSet> results) throws PeriodicQueryStorageException;;
-    
+
     /**
      * Deletes periodic query results from the storage layer
      * @param queryId - id indicating the storage layer that results will be deleted from
@@ -89,14 +89,14 @@ public interface PeriodicQueryResultStorage {
      * @throws PeriodicQueryStorageException
      */
     public void deletePeriodicQueryResults(String queryId, long binID) throws PeriodicQueryStorageException;;
-    
+
     /**
-     * Deletes all results for the storage layer indicated by the given query id 
+     * Deletes all results for the storage layer indicated by the given query id
      * @param queryID - id indicating the storage layer whose results will be deleted
      * @throws PeriodicQueryStorageException
      */
     public void deletePeriodicQuery(String queryID) throws PeriodicQueryStorageException;;
-    
+
     /**
      * List results in the given storage layer indicated by the query id
      * @param queryId - id indicating the storage layer whose results will be listed
@@ -105,11 +105,11 @@ public interface PeriodicQueryResultStorage {
      * @throws PeriodicQueryStorageException
      */
     public CloseableIterator<BindingSet> listResults(String queryId, Optional<Long> binID) throws PeriodicQueryStorageException;;
-    
+
     /**
      * List all storage tables containing periodic results.
      * @return List of Strings with names of all tables containing periodic results
      */
     public List<String> listPeriodicTables();
-    
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/f0725df5/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/PrecomputedJoinStorage.java
----------------------------------------------------------------------
diff --git a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/PrecomputedJoinStorage.java b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/PrecomputedJoinStorage.java
index 4988035..70c8b0e 100644
--- a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/PrecomputedJoinStorage.java
+++ b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/PrecomputedJoinStorage.java
@@ -23,6 +23,7 @@ import java.util.Iterator;
 import java.util.List;
 
 import org.apache.rya.api.model.VisibilityBindingSet;
+import org.apache.rya.api.utils.CloseableIterator;
 import org.openrdf.query.BindingSet;
 
 import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
@@ -103,16 +104,6 @@ public interface PrecomputedJoinStorage extends AutoCloseable {
     public void close() throws PCJStorageException;
 
     /**
-     * An {@link Iterator} that also extends {@link AutoCloseable} because it has reference to resources
-     * that need to be released once you are done iterating.
-     *
-     * @param <T> The type of object that is iterated over.
-     */
-    public static interface CloseableIterator<T> extends Iterator<T>, AutoCloseable {
-
-    }
-
-    /**
      * An operation of {@link PrecomputedJoinStorage} failed.
      */
     public static class PCJStorageException extends PcjException {

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/f0725df5/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/AccumuloPcjStorage.java
----------------------------------------------------------------------
diff --git a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/AccumuloPcjStorage.java b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/AccumuloPcjStorage.java
index 3d0f11b..f3d078d 100644
--- a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/AccumuloPcjStorage.java
+++ b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/AccumuloPcjStorage.java
@@ -39,6 +39,7 @@ import org.apache.rya.api.instance.RyaDetailsRepository.RyaDetailsRepositoryExce
 import org.apache.rya.api.instance.RyaDetailsUpdater;
 import org.apache.rya.api.instance.RyaDetailsUpdater.RyaDetailsMutator.CouldNotApplyMutationException;
 import org.apache.rya.api.model.VisibilityBindingSet;
+import org.apache.rya.api.utils.CloseableIterator;
 import org.apache.rya.indexing.pcj.storage.PCJIdFactory;
 import org.apache.rya.indexing.pcj.storage.PcjMetadata;
 import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/f0725df5/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/AccumuloPeriodicQueryResultStorage.java
----------------------------------------------------------------------
diff --git a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/AccumuloPeriodicQueryResultStorage.java b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/AccumuloPeriodicQueryResultStorage.java
index f8547f5..8124aff 100644
--- a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/AccumuloPeriodicQueryResultStorage.java
+++ b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/AccumuloPeriodicQueryResultStorage.java
@@ -38,11 +38,11 @@ import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.hadoop.io.Text;
 import org.apache.rya.api.model.VisibilityBindingSet;
+import org.apache.rya.api.utils.CloseableIterator;
 import org.apache.rya.indexing.pcj.storage.PCJIdFactory;
 import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage;
 import org.apache.rya.indexing.pcj.storage.PeriodicQueryStorageException;
 import org.apache.rya.indexing.pcj.storage.PeriodicQueryStorageMetadata;
-import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator;
 import org.apache.rya.indexing.pcj.storage.accumulo.BindingSetConverter.BindingSetConversionException;
 import org.openrdf.model.impl.LiteralImpl;
 import org.openrdf.model.vocabulary.XMLSchema;

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/f0725df5/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/AccumuloValueBindingSetIterator.java
----------------------------------------------------------------------
diff --git a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/AccumuloValueBindingSetIterator.java b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/AccumuloValueBindingSetIterator.java
index ff8ff14..c488d36 100644
--- a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/AccumuloValueBindingSetIterator.java
+++ b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/AccumuloValueBindingSetIterator.java
@@ -25,8 +25,8 @@ import org.apache.accumulo.core.client.Scanner;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Value;
 import org.apache.fluo.api.data.Bytes;
+import org.apache.rya.api.utils.CloseableIterator;
 import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage;
-import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator;
 import org.openrdf.query.BindingSet;
 
 /**

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/f0725df5/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjTables.java
----------------------------------------------------------------------
diff --git a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjTables.java b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjTables.java
index 40db32a..9346c00 100644
--- a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjTables.java
+++ b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjTables.java
@@ -56,10 +56,9 @@ import org.apache.accumulo.core.security.ColumnVisibility;
 import org.apache.hadoop.io.Text;
 import org.apache.log4j.Logger;
 import org.apache.rya.api.model.VisibilityBindingSet;
+import org.apache.rya.api.utils.CloseableIterator;
 import org.apache.rya.indexing.pcj.storage.PcjMetadata;
-import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator;
 import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.PCJStorageException;
-import org.apache.rya.indexing.pcj.storage.accumulo.BindingSetConverter.BindingSetConversionException;
 import org.openrdf.query.BindingSet;
 import org.openrdf.query.MalformedQueryException;
 import org.openrdf.query.QueryEvaluationException;

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/f0725df5/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/ScannerBindingSetIterator.java
----------------------------------------------------------------------
diff --git a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/ScannerBindingSetIterator.java b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/ScannerBindingSetIterator.java
index 26fd8c9..b457dfd 100644
--- a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/ScannerBindingSetIterator.java
+++ b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/ScannerBindingSetIterator.java
@@ -27,7 +27,7 @@ import java.util.Map.Entry;
 import org.apache.accumulo.core.client.Scanner;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Value;
-import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator;
+import org.apache.rya.api.utils.CloseableIterator;
 import org.apache.rya.indexing.pcj.storage.accumulo.BindingSetConverter.BindingSetConversionException;
 import org.openrdf.query.BindingSet;
 

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/f0725df5/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjTablesIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjTablesIT.java b/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjTablesIT.java
index e689f9d..b95c812 100644
--- a/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjTablesIT.java
+++ b/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjTablesIT.java
@@ -45,9 +45,9 @@ import org.apache.rya.accumulo.MiniAccumuloSingleton;
 import org.apache.rya.accumulo.RyaTestInstanceRule;
 import org.apache.rya.api.RdfCloudTripleStoreConfiguration;
 import org.apache.rya.api.model.VisibilityBindingSet;
+import org.apache.rya.api.utils.CloseableIterator;
 import org.apache.rya.indexing.pcj.storage.PcjException;
 import org.apache.rya.indexing.pcj.storage.PcjMetadata;
-import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator;
 import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.PCJStorageException;
 import org.apache.rya.indexing.pcj.storage.accumulo.BindingSetConverter.BindingSetConversionException;
 import org.apache.rya.rdftriplestore.RdfCloudTripleStore;
@@ -120,7 +120,7 @@ public class PcjTablesIT {
     private String getRyaInstanceName() {
         return testInstance.getRyaInstanceName();
     }
-    
+
     /**
      * Format a Mini Accumulo to be a Rya repository.
      *

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/f0725df5/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/integration/AccumuloPcjStorageIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/integration/AccumuloPcjStorageIT.java b/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/integration/AccumuloPcjStorageIT.java
index 5ba5e40..33571f7 100644
--- a/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/integration/AccumuloPcjStorageIT.java
+++ b/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/integration/AccumuloPcjStorageIT.java
@@ -39,9 +39,9 @@ import org.apache.rya.api.instance.RyaDetailsRepository;
 import org.apache.rya.api.instance.RyaDetailsRepository.NotInitializedException;
 import org.apache.rya.api.instance.RyaDetailsRepository.RyaDetailsRepositoryException;
 import org.apache.rya.api.model.VisibilityBindingSet;
+import org.apache.rya.api.utils.CloseableIterator;
 import org.apache.rya.indexing.pcj.storage.PcjMetadata;
 import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
-import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator;
 import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.PCJStorageException;
 import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage;
 import org.apache.rya.indexing.pcj.storage.accumulo.ShiftVarOrderFactory;

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/f0725df5/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/integration/AccumuloPeriodicQueryResultStorageIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/integration/AccumuloPeriodicQueryResultStorageIT.java b/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/integration/AccumuloPeriodicQueryResultStorageIT.java
index 1e21bf2..2d9da4d 100644
--- a/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/integration/AccumuloPeriodicQueryResultStorageIT.java
+++ b/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/integration/AccumuloPeriodicQueryResultStorageIT.java
@@ -30,10 +30,10 @@ import org.apache.accumulo.core.client.AccumuloSecurityException;
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.rya.accumulo.AccumuloITBase;
 import org.apache.rya.api.model.VisibilityBindingSet;
+import org.apache.rya.api.utils.CloseableIterator;
 import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage;
 import org.apache.rya.indexing.pcj.storage.PeriodicQueryStorageException;
 import org.apache.rya.indexing.pcj.storage.PeriodicQueryStorageMetadata;
-import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator;
 import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPeriodicQueryResultStorage;
 import org.apache.rya.indexing.pcj.storage.accumulo.PeriodicQueryTableNameFactory;
 import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
@@ -53,86 +53,86 @@ public class AccumuloPeriodicQueryResultStorageIT extends AccumuloITBase {
     private static final String RYA = "rya_";
     private static final PeriodicQueryTableNameFactory nameFactory = new PeriodicQueryTableNameFactory();
     private static final ValueFactory vf = new ValueFactoryImpl();
-    
+
     @Before
     public void init() throws AccumuloException, AccumuloSecurityException {
         super.getConnector().securityOperations().changeUserAuthorizations("root", new Authorizations("U"));
         periodicStorage = new AccumuloPeriodicQueryResultStorage(super.getConnector(), RYA);
     }
-    
-    
+
+
     @Test
     public void testCreateAndMeta() throws PeriodicQueryStorageException {
-        
+
         String sparql = "select ?x where { ?x <urn:pred> ?y.}";
         VariableOrder varOrder = new VariableOrder("periodicBinId", "x");
         PeriodicQueryStorageMetadata expectedMeta = new PeriodicQueryStorageMetadata(sparql, varOrder);
-        
+
         String id = periodicStorage.createPeriodicQuery(sparql);
         Assert.assertEquals(expectedMeta, periodicStorage.getPeriodicQueryMetadata(id));
         Assert.assertEquals(Arrays.asList(nameFactory.makeTableName(RYA, id)), periodicStorage.listPeriodicTables());
         periodicStorage.deletePeriodicQuery(id);
     }
-    
-    
+
+
     @Test
     public void testAddListDelete() throws Exception {
-        
+
         String sparql = "select ?x where { ?x <urn:pred> ?y.}";
         String id = periodicStorage.createPeriodicQuery(sparql);
-        
+
         Set<BindingSet> expected = new HashSet<>();
         Set<VisibilityBindingSet> storageSet = new HashSet<>();
-        
+
         //add result matching user's visibility
         QueryBindingSet bs = new QueryBindingSet();
         bs.addBinding("periodicBinId", vf.createLiteral(1L));
         bs.addBinding("x",vf.createURI("uri:uri123"));
         expected.add(bs);
         storageSet.add(new VisibilityBindingSet(bs,"U"));
-        
+
         //add result with different visibility that is not expected
         bs = new QueryBindingSet();
         bs.addBinding("periodicBinId", vf.createLiteral(1L));
         bs.addBinding("x",vf.createURI("uri:uri456"));
         storageSet.add(new VisibilityBindingSet(bs,"V"));
-        
+
         periodicStorage.addPeriodicQueryResults(id, storageSet);
-        
+
         Set<BindingSet> actual = new HashSet<>();
         try(CloseableIterator<BindingSet> iter = periodicStorage.listResults(id, Optional.of(1L))) {
             iter.forEachRemaining(x -> actual.add(x));
         }
-        
+
         Assert.assertEquals(expected, actual);
-        
+
         periodicStorage.deletePeriodicQueryResults(id, 1L);
-        
+
         Set<BindingSet> actual2 = new HashSet<>();
         try(CloseableIterator<BindingSet> iter = periodicStorage.listResults(id, Optional.of(1L))) {
             iter.forEachRemaining(x -> actual2.add(x));
         }
-        
+
         Assert.assertEquals(new HashSet<>(), actual2);
         periodicStorage.deletePeriodicQuery(id);
-        
+
     }
-    
+
     @Test
     public void multiBinTest() throws PeriodicQueryStorageException, Exception {
-        
+
         String sparql = "prefix function: <http://org.apache.rya/function#> " //n
                 + "prefix time: <http://www.w3.org/2006/time#> " //n
                 + "select ?id (count(?obs) as ?total) where {" //n
                 + "Filter(function:periodic(?time, 2, .5, time:hours)) " //n
                 + "?obs <uri:hasTime> ?time. " //n
                 + "?obs <uri:hasId> ?id } group by ?id"; //n
-        
-        
+
+
         final ValueFactory vf = new ValueFactoryImpl();
         long currentTime = System.currentTimeMillis();
         String queryId = UUID.randomUUID().toString().replace("-", "");
-        
+
         // Create the expected results of the SPARQL query once the PCJ has been computed.
         final Set<BindingSet> expected1 = new HashSet<>();
         final Set<BindingSet> expected2 = new HashSet<>();
@@ -142,81 +142,81 @@ public class AccumuloPeriodicQueryResultStorageIT extends AccumuloITBase {
 
         long period = 1800000;
         long binId = (currentTime/period)*period;
-        
+
         MapBindingSet bs = new MapBindingSet();
         bs.addBinding("total", vf.createLiteral("2", XMLSchema.INTEGER));
         bs.addBinding("id", vf.createLiteral("id_1", XMLSchema.STRING));
         bs.addBinding("periodicBinId", vf.createLiteral(binId));
         expected1.add(bs);
         storageResults.add(new VisibilityBindingSet(bs));
-        
+
         bs = new MapBindingSet();
         bs.addBinding("total", vf.createLiteral("2", XMLSchema.INTEGER));
         bs.addBinding("id", vf.createLiteral("id_2", XMLSchema.STRING));
         bs.addBinding("periodicBinId", vf.createLiteral(binId));
         expected1.add(bs);
         storageResults.add(new VisibilityBindingSet(bs));
-        
+
         bs = new MapBindingSet();
         bs.addBinding("total", vf.createLiteral("1", XMLSchema.INTEGER));
         bs.addBinding("id", vf.createLiteral("id_3", XMLSchema.STRING));
         bs.addBinding("periodicBinId", vf.createLiteral(binId));
         expected1.add(bs);
         storageResults.add(new VisibilityBindingSet(bs));
-        
+
         bs = new MapBindingSet();
         bs.addBinding("total", vf.createLiteral("1", XMLSchema.INTEGER));
         bs.addBinding("id", vf.createLiteral("id_4", XMLSchema.STRING));
         bs.addBinding("periodicBinId", vf.createLiteral(binId));
         expected1.add(bs);
         storageResults.add(new VisibilityBindingSet(bs));
-        
+
         bs = new MapBindingSet();
         bs.addBinding("total", vf.createLiteral("1", XMLSchema.INTEGER));
         bs.addBinding("id", vf.createLiteral("id_1", XMLSchema.STRING));
         bs.addBinding("periodicBinId", vf.createLiteral(binId + period));
         expected2.add(bs);
         storageResults.add(new VisibilityBindingSet(bs));
-        
+
         bs = new MapBindingSet();
         bs.addBinding("total", vf.createLiteral("2", XMLSchema.INTEGER));
         bs.addBinding("id", vf.createLiteral("id_2", XMLSchema.STRING));
         bs.addBinding("periodicBinId", vf.createLiteral(binId + period));
         expected2.add(bs);
         storageResults.add(new VisibilityBindingSet(bs));
-        
+
         bs = new MapBindingSet();
         bs.addBinding("total", vf.createLiteral("1", XMLSchema.INTEGER));
         bs.addBinding("id", vf.createLiteral("id_3", XMLSchema.STRING));
         bs.addBinding("periodicBinId", vf.createLiteral(binId + period));
         expected2.add(bs);
         storageResults.add(new VisibilityBindingSet(bs));
-        
+
         bs = new MapBindingSet();
         bs.addBinding("total", vf.createLiteral("1", XMLSchema.INTEGER));
         bs.addBinding("id", vf.createLiteral("id_1", XMLSchema.STRING));
         bs.addBinding("periodicBinId", vf.createLiteral(binId + 2*period));
         expected3.add(bs);
         storageResults.add(new VisibilityBindingSet(bs));
-        
+
         bs = new MapBindingSet();
         bs.addBinding("total", vf.createLiteral("1", XMLSchema.INTEGER));
         bs.addBinding("id", vf.createLiteral("id_2", XMLSchema.STRING));
         bs.addBinding("periodicBinId", vf.createLiteral(binId + 2*period));
         expected3.add(bs);
         storageResults.add(new VisibilityBindingSet(bs));
-        
+
         bs = new MapBindingSet();
         bs.addBinding("total", vf.createLiteral("1", XMLSchema.INTEGER));
         bs.addBinding("id", vf.createLiteral("id_1", XMLSchema.STRING));
         bs.addBinding("periodicBinId", vf.createLiteral(binId + 3*period));
         expected4.add(bs);
         storageResults.add(new VisibilityBindingSet(bs));
-        
-        
+
+
         String id = periodicStorage.createPeriodicQuery(queryId, sparql);
         periodicStorage.addPeriodicQueryResults(queryId, storageResults);
-        
+
         try(CloseableIterator<BindingSet> iter = periodicStorage.listResults(queryId, Optional.of(binId))) {
             Set<BindingSet> actual1 = new HashSet<>();
             while(iter.hasNext()) {
@@ -224,7 +224,7 @@ public class AccumuloPeriodicQueryResultStorageIT extends AccumuloITBase {
             }
             Assert.assertEquals(expected1, actual1);
         }
-        
+
         periodicStorage.deletePeriodicQueryResults(queryId, binId);
         try(CloseableIterator<BindingSet> iter = periodicStorage.listResults(queryId, Optional.of(binId))) {
             Set<BindingSet> actual1 = new HashSet<>();
@@ -233,7 +233,7 @@ public class AccumuloPeriodicQueryResultStorageIT extends AccumuloITBase {
             }
             Assert.assertEquals(Collections.emptySet(), actual1);
         }
-        
+
         try(CloseableIterator<BindingSet> iter = periodicStorage.listResults(queryId, Optional.of(binId + period))) {
             Set<BindingSet> actual2 = new HashSet<>();
             while(iter.hasNext()) {
@@ -241,7 +241,7 @@ public class AccumuloPeriodicQueryResultStorageIT extends AccumuloITBase {
             }
             Assert.assertEquals(expected2, actual2);
         }
-        
+
         periodicStorage.deletePeriodicQueryResults(queryId, binId + period);
         try(CloseableIterator<BindingSet> iter = periodicStorage.listResults(queryId, Optional.of(binId + period))) {
             Set<BindingSet> actual2 = new HashSet<>();
@@ -250,7 +250,7 @@ public class AccumuloPeriodicQueryResultStorageIT extends AccumuloITBase {
             }
             Assert.assertEquals(Collections.emptySet(), actual2);
         }
-        
+
         try(CloseableIterator<BindingSet> iter = periodicStorage.listResults(queryId, Optional.of(binId + 2*period))) {
             Set<BindingSet> actual3 = new HashSet<>();
             while(iter.hasNext()) {
@@ -258,7 +258,7 @@ public class AccumuloPeriodicQueryResultStorageIT extends AccumuloITBase {
             }
             Assert.assertEquals(expected3, actual3);
         }
-        
+
         try(CloseableIterator<BindingSet> iter = periodicStorage.listResults(queryId, Optional.of(binId + 3*period))) {
             Set<BindingSet> actual4 = new HashSet<>();
             while(iter.hasNext()) {

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/f0725df5/extras/rya.pcj.fluo/pcj.fluo.app/pom.xml
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/pom.xml b/extras/rya.pcj.fluo/pcj.fluo.app/pom.xml
index f2e8cf9..5493a5f 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/pom.xml
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/pom.xml
@@ -62,7 +62,7 @@ under the License.
         
         <dependency>
             <groupId>org.apache.rya</groupId>
-            <artifactId>rya.api.function</artifactId>
+            <artifactId>rya.api.evaluation</artifactId>
         </dependency>
 
         <!-- 3rd Party Runtime Dependencies. -->

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/f0725df5/extras/rya.pcj.fluo/pcj.fluo.demo/src/main/java/org/apache/rya/indexing/pcj/fluo/demo/FluoAndHistoricPcjsDemo.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.demo/src/main/java/org/apache/rya/indexing/pcj/fluo/demo/FluoAndHistoricPcjsDemo.java b/extras/rya.pcj.fluo/pcj.fluo.demo/src/main/java/org/apache/rya/indexing/pcj/fluo/demo/FluoAndHistoricPcjsDemo.java
index 3fea6ed..181f322 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.demo/src/main/java/org/apache/rya/indexing/pcj/fluo/demo/FluoAndHistoricPcjsDemo.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.demo/src/main/java/org/apache/rya/indexing/pcj/fluo/demo/FluoAndHistoricPcjsDemo.java
@@ -33,12 +33,12 @@ import org.apache.rya.api.domain.RyaType;
 import org.apache.rya.api.domain.RyaURI;
 import org.apache.rya.api.persist.RyaDAOException;
 import org.apache.rya.api.resolver.RyaToRdfConversions;
+import org.apache.rya.api.utils.CloseableIterator;
 import org.apache.rya.indexing.pcj.fluo.api.CreateFluoPcj;
 import org.apache.rya.indexing.pcj.fluo.api.InsertTriples;
 import org.apache.rya.indexing.pcj.fluo.app.query.UnsupportedQueryException;
 import org.apache.rya.indexing.pcj.storage.PcjException;
 import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
-import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator;
 import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage;
 import org.apache.rya.rdftriplestore.RyaSailRepository;
 import org.openrdf.model.Statement;

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/f0725df5/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/InputIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/InputIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/InputIT.java
index d623043..866d32b 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/InputIT.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/InputIT.java
@@ -29,10 +29,10 @@ import org.apache.fluo.api.client.FluoClient;
 import org.apache.fluo.api.client.FluoFactory;
 import org.apache.rya.api.domain.RyaStatement;
 import org.apache.rya.api.domain.RyaURI;
+import org.apache.rya.api.utils.CloseableIterator;
 import org.apache.rya.indexing.pcj.fluo.api.CreateFluoPcj;
 import org.apache.rya.indexing.pcj.fluo.api.InsertTriples;
 import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
-import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator;
 import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage;
 import org.apache.rya.pcj.fluo.test.base.RyaExportITBase;
 import org.junit.Test;

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/f0725df5/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/QueryIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/QueryIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/QueryIT.java
index 3e72f1b..610f502 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/QueryIT.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/QueryIT.java
@@ -39,11 +39,11 @@ import org.apache.rya.api.client.RyaClient;
 import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory;
 import org.apache.rya.api.functions.DateTimeWithinPeriod;
 import org.apache.rya.api.functions.OWLTime;
+import org.apache.rya.api.utils.CloseableIterator;
 import org.apache.rya.indexing.pcj.fluo.api.CreateFluoPcj;
 import org.apache.rya.indexing.pcj.fluo.app.query.UnsupportedQueryException;
 import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage;
 import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
-import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator;
 import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage;
 import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPeriodicQueryResultStorage;
 import org.apache.rya.pcj.fluo.test.base.RyaExportITBase;

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/f0725df5/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaInputIncrementalUpdateIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaInputIncrementalUpdateIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaInputIncrementalUpdateIT.java
index 5cd3ab1..65083e8 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaInputIncrementalUpdateIT.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaInputIncrementalUpdateIT.java
@@ -27,10 +27,10 @@ import org.apache.accumulo.core.client.Connector;
 import org.apache.fluo.api.client.FluoClient;
 import org.apache.fluo.api.client.FluoFactory;
 import org.apache.rya.accumulo.AccumuloRyaDAO;
+import org.apache.rya.api.utils.CloseableIterator;
 import org.apache.rya.indexing.external.PrecomputedJoinIndexer;
 import org.apache.rya.indexing.pcj.fluo.api.CreateFluoPcj;
 import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
-import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator;
 import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage;
 import org.apache.rya.indexing.pcj.update.PrecomputedJoinUpdater;
 import org.apache.rya.pcj.fluo.test.base.RyaExportITBase;

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/f0725df5/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/StreamingTestIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/StreamingTestIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/StreamingTestIT.java
index e83a894..6135920 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/StreamingTestIT.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/StreamingTestIT.java
@@ -28,9 +28,9 @@ import org.apache.accumulo.core.client.Connector;
 import org.apache.fluo.api.client.FluoClient;
 import org.apache.fluo.api.client.FluoFactory;
 import org.apache.log4j.Logger;
+import org.apache.rya.api.utils.CloseableIterator;
 import org.apache.rya.indexing.pcj.fluo.api.CreateFluoPcj;
 import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
-import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator;
 import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage;
 import org.apache.rya.pcj.fluo.test.base.RyaExportITBase;
 import org.junit.Test;

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/f0725df5/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/visibility/PcjVisibilityIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/visibility/PcjVisibilityIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/visibility/PcjVisibilityIT.java
index 8529bd5..90ed01a 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/visibility/PcjVisibilityIT.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/visibility/PcjVisibilityIT.java
@@ -47,12 +47,12 @@ import org.apache.rya.api.client.RyaClient;
 import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory;
 import org.apache.rya.api.domain.RyaStatement;
 import org.apache.rya.api.domain.RyaURI;
+import org.apache.rya.api.utils.CloseableIterator;
 import org.apache.rya.indexing.accumulo.ConfigUtils;
 import org.apache.rya.indexing.external.PrecomputedJoinIndexerConfig;
 import org.apache.rya.indexing.pcj.fluo.api.CreateFluoPcj;
 import org.apache.rya.indexing.pcj.fluo.api.InsertTriples;
 import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
-import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator;
 import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage;
 import org.apache.rya.indexing.pcj.storage.accumulo.PcjTableNameFactory;
 import org.apache.rya.pcj.fluo.test.base.RyaExportITBase;

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/f0725df5/extras/rya.streams/api/pom.xml
----------------------------------------------------------------------
diff --git a/extras/rya.streams/api/pom.xml b/extras/rya.streams/api/pom.xml
index 250028f..55c0e79 100644
--- a/extras/rya.streams/api/pom.xml
+++ b/extras/rya.streams/api/pom.xml
@@ -54,11 +54,6 @@ under the License.
             <artifactId>guava</artifactId>
         </dependency>
         
-        <dependency>
-            <groupId>com.google.guava</groupId>
-            <artifactId>guava</artifactId>
-        </dependency>
-        
         <!-- Test dependences -->
         <dependency>
             <groupId>junit</groupId>

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/f0725df5/extras/rya.streams/kafka/pom.xml
----------------------------------------------------------------------
diff --git a/extras/rya.streams/kafka/pom.xml b/extras/rya.streams/kafka/pom.xml
index 778630d..16b07b2 100644
--- a/extras/rya.streams/kafka/pom.xml
+++ b/extras/rya.streams/kafka/pom.xml
@@ -60,7 +60,7 @@ under the License.
         </dependency>
         <dependency>
             <groupId>org.apache.rya</groupId>
-            <artifactId>rya.api.function</artifactId>
+            <artifactId>rya.api.evaluation</artifactId>
         </dependency>
         <dependency>
             <groupId>org.apache.rya</groupId>

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/f0725df5/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/ProcessorResult.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/ProcessorResult.java b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/ProcessorResult.java
index 5f7a06b..124bc76 100644
--- a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/ProcessorResult.java
+++ b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/ProcessorResult.java
@@ -36,7 +36,7 @@ import edu.umd.cs.findbugs.annotations.NonNull;
  * {@link VisibilityBindingSet} because some downstream processors require more information about
  * which upstream processor is emitting the result in order to do their work.
  * </p>
- * Currently there are only two types processors:
+ * Currently there are only two types of processors:
  * <ul>
  *   <li>Unary Processor - A processor that only has a single upstream node feeding it input.</li>
  *   <li>Binary Processor - A processor that has two upstream nodes feeding it input.</li>

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/f0725df5/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/join/CloseableIterator.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/join/CloseableIterator.java b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/join/CloseableIterator.java
deleted file mode 100644
index 9ea927d..0000000
--- a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/join/CloseableIterator.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.rya.streams.kafka.processors.join;
-
-import java.util.Iterator;
-
-import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
-import edu.umd.cs.findbugs.annotations.NonNull;
-
-/**
- * An {@link Iterator} that is also {@link AutoCloseable}.
- *
- * @param <T> - The type of elements that will be iterated over.
- */
-@DefaultAnnotation(NonNull.class)
-public interface CloseableIterator<T> extends Iterator<T>, AutoCloseable { }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/f0725df5/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/join/JoinProcessorSupplier.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/join/JoinProcessorSupplier.java b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/join/JoinProcessorSupplier.java
index 9ed2363..367ca6f 100644
--- a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/join/JoinProcessorSupplier.java
+++ b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/join/JoinProcessorSupplier.java
@@ -28,6 +28,7 @@ import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.rya.api.function.join.IterativeJoin;
 import org.apache.rya.api.model.VisibilityBindingSet;
+import org.apache.rya.api.utils.CloseableIterator;
 import org.apache.rya.streams.kafka.processors.ProcessorResult;
 import org.apache.rya.streams.kafka.processors.ProcessorResult.BinaryResult;
 import org.apache.rya.streams.kafka.processors.ProcessorResult.BinaryResult.Side;
@@ -75,7 +76,7 @@ public class JoinProcessorSupplier extends RyaStreamsProcessorSupplier {
         this.allVars = requireNonNull(allVars);
 
         if(!allVars.subList(0, joinVars.size()).equals(joinVars)) {
-            throw new IllegalArgumentException("All vars must be lead by the join vars, but it did not. " +
+            throw new IllegalArgumentException("The allVars list must start with the joinVars list, but it did not. " +
                     "Join Vars: " + joinVars + ", All Vars: " + allVars);
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/f0725df5/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/join/JoinStateStore.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/join/JoinStateStore.java b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/join/JoinStateStore.java
index 17a6ebb..2afc1d8 100644
--- a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/join/JoinStateStore.java
+++ b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/join/JoinStateStore.java
@@ -19,6 +19,7 @@
 package org.apache.rya.streams.kafka.processors.join;
 
 import org.apache.rya.api.model.VisibilityBindingSet;
+import org.apache.rya.api.utils.CloseableIterator;
 import org.apache.rya.streams.kafka.processors.ProcessorResult.BinaryResult;
 
 import edu.umd.cs.findbugs.annotations.DefaultAnnotation;

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/f0725df5/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/join/KeyValueJoinStateStore.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/join/KeyValueJoinStateStore.java b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/join/KeyValueJoinStateStore.java
index d12957a..254f226 100644
--- a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/join/KeyValueJoinStateStore.java
+++ b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/join/KeyValueJoinStateStore.java
@@ -28,6 +28,7 @@ import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.rya.api.model.VisibilityBindingSet;
+import org.apache.rya.api.utils.CloseableIterator;
 import org.apache.rya.streams.kafka.processors.ProcessorResult.BinaryResult;
 import org.apache.rya.streams.kafka.processors.ProcessorResult.BinaryResult.Side;
 import org.openrdf.query.impl.MapBindingSet;

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/f0725df5/extras/rya.streams/pom.xml
----------------------------------------------------------------------
diff --git a/extras/rya.streams/pom.xml b/extras/rya.streams/pom.xml
index dd876a0..93b6b1c 100644
--- a/extras/rya.streams/pom.xml
+++ b/extras/rya.streams/pom.xml
@@ -38,7 +38,15 @@
         <module>kafka-test</module>
         <module>api</module>
         <module>client</module>
-        <module>geo</module>
         <module>integration</module>
     </modules>
+    
+    <profiles>
+        <profile>
+            <id>geoindexing</id>
+            <modules>
+                <module>geo</module>
+           </modules>
+        </profile>
+    </profiles>
 </project>

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/f0725df5/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 31b17f8..99640f2 100644
--- a/pom.xml
+++ b/pom.xml
@@ -217,7 +217,7 @@ under the License.
             </dependency>
             <dependency>
                 <groupId>org.apache.rya</groupId>
-                <artifactId>rya.api.function</artifactId>
+                <artifactId>rya.api.evaluation</artifactId>
                 <version>${project.version}</version>
             </dependency>
             <dependency>