You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rya.apache.org by ca...@apache.org on 2017/10/12 18:43:23 UTC

[02/11] incubator-rya git commit: RYA-402 Create Kafka reusable test code project. Closes #242.

RYA-402 Create Kafka reusable test code project. Closes #242.


Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/4089e706
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/4089e706
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/4089e706

Branch: refs/heads/master
Commit: 4089e706ca17c54ced3723602672f1a69d2aa2be
Parents: 6dd81bd
Author: kchilton2 <ke...@gmail.com>
Authored: Tue Oct 10 18:36:25 2017 -0400
Committer: jdasch <hc...@gmail.com>
Committed: Thu Oct 12 12:51:38 2017 -0400

----------------------------------------------------------------------
 .../pcj/matching/AccumuloIndexSetProvider.java  |   3 +-
 ...dicNotificationApplicationConfiguration.java |   2 +-
 .../exporter/KafkaExporterExecutor.java         |   2 +-
 .../KafkaPeriodicBindingSetExporter.java        |   2 +-
 .../notification/pruner/AccumuloBinPruner.java  |   2 +-
 .../pruner/PeriodicQueryPruner.java             |   2 +-
 extras/periodic.notification/tests/pom.xml      |   4 +
 .../PeriodicNotificationApplicationIT.java      | 288 ++++++++++---------
 .../PeriodicNotificationExporterIT.java         |   4 +-
 .../PeriodicCommandNotificationConsumerIT.java  |  38 +--
 .../app/batch/AbstractSpanBatchInformation.java |   2 +-
 .../fluo/app/batch/JoinBatchInformation.java    |   2 +-
 .../export/kafka/KafkaExportParameterBase.java  |   2 +-
 .../rya/kafka/base/EmbeddedKafkaInstance.java   | 143 ---------
 .../rya/kafka/base/EmbeddedKafkaSingleton.java  |  87 ------
 .../org/apache/rya/kafka/base/KafkaITBase.java  |  38 ---
 .../rya/kafka/base/KafkaTestInstanceRule.java   |  98 -------
 pom.xml                                         |  11 +
 test/kafka/pom.xml                              |  81 ++++++
 .../rya/test/kafka/EmbeddedKafkaInstance.java   | 142 +++++++++
 .../rya/test/kafka/EmbeddedKafkaSingleton.java  |  87 ++++++
 .../org/apache/rya/test/kafka/KafkaITBase.java  |  38 +++
 .../rya/test/kafka/KafkaTestInstanceRule.java   |  98 +++++++
 .../org/apache/rya/test/kafka/PortUtils.java    |  44 +++
 test/pom.xml                                    |  39 +++
 25 files changed, 721 insertions(+), 538 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/4089e706/extras/indexing/src/main/java/org/apache/rya/indexing/pcj/matching/AccumuloIndexSetProvider.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/org/apache/rya/indexing/pcj/matching/AccumuloIndexSetProvider.java b/extras/indexing/src/main/java/org/apache/rya/indexing/pcj/matching/AccumuloIndexSetProvider.java
index 1940e64..40e2c77 100644
--- a/extras/indexing/src/main/java/org/apache/rya/indexing/pcj/matching/AccumuloIndexSetProvider.java
+++ b/extras/indexing/src/main/java/org/apache/rya/indexing/pcj/matching/AccumuloIndexSetProvider.java
@@ -52,11 +52,10 @@ import org.openrdf.query.QueryEvaluationException;
 import org.openrdf.query.algebra.TupleExpr;
 import org.openrdf.sail.SailException;
 
+import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
-import jline.internal.Preconditions;
-
 /**
  * Implementation of {@link ExternalSetProvider} that provides {@link ExternalTupleSet}s.
  * This provider uses either user specified Accumulo configuration information or user a specified

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/4089e706/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplicationConfiguration.java
----------------------------------------------------------------------
diff --git a/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplicationConfiguration.java b/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplicationConfiguration.java
index d69efe5..ff58979 100644
--- a/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplicationConfiguration.java
+++ b/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplicationConfiguration.java
@@ -22,7 +22,7 @@ import java.util.Properties;
 
 import org.apache.rya.accumulo.AccumuloRdfConfiguration;
 
-import jline.internal.Preconditions;
+import com.google.common.base.Preconditions;
 
 /**
  * Configuration object for creating a {@link PeriodicNotificationApplication}.

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/4089e706/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/exporter/KafkaExporterExecutor.java
----------------------------------------------------------------------
diff --git a/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/exporter/KafkaExporterExecutor.java b/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/exporter/KafkaExporterExecutor.java
index c2e5ebf..3b639e9 100644
--- a/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/exporter/KafkaExporterExecutor.java
+++ b/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/exporter/KafkaExporterExecutor.java
@@ -32,7 +32,7 @@ import org.apache.rya.periodic.notification.api.BindingSetRecord;
 import org.apache.rya.periodic.notification.api.LifeCycle;
 import org.openrdf.query.BindingSet;
 
-import jline.internal.Preconditions;
+import com.google.common.base.Preconditions;
 
 /**
  * Executor service that runs {@link KafkaPeriodicBindingSetExporter}s.  

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/4089e706/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/exporter/KafkaPeriodicBindingSetExporter.java
----------------------------------------------------------------------
diff --git a/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/exporter/KafkaPeriodicBindingSetExporter.java b/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/exporter/KafkaPeriodicBindingSetExporter.java
index 8a0322f..5397618 100644
--- a/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/exporter/KafkaPeriodicBindingSetExporter.java
+++ b/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/exporter/KafkaPeriodicBindingSetExporter.java
@@ -36,7 +36,7 @@ import org.apache.rya.periodic.notification.api.BindingSetRecordExportException;
 import org.openrdf.model.Literal;
 import org.openrdf.query.BindingSet;
 
-import jline.internal.Preconditions;
+import com.google.common.base.Preconditions;
 
 /**
  * Object that exports {@link BindingSet}s to the Kafka topic indicated by

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/4089e706/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/pruner/AccumuloBinPruner.java
----------------------------------------------------------------------
diff --git a/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/pruner/AccumuloBinPruner.java b/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/pruner/AccumuloBinPruner.java
index 4dac64c..a9403c2 100644
--- a/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/pruner/AccumuloBinPruner.java
+++ b/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/pruner/AccumuloBinPruner.java
@@ -24,7 +24,7 @@ import org.apache.rya.indexing.pcj.storage.PeriodicQueryStorageException;
 import org.apache.rya.periodic.notification.api.BinPruner;
 import org.apache.rya.periodic.notification.api.NodeBin;
 
-import jline.internal.Preconditions;
+import com.google.common.base.Preconditions;
 
 /**
  * Deletes BindingSets from time bins in the indicated PCJ table

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/4089e706/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/pruner/PeriodicQueryPruner.java
----------------------------------------------------------------------
diff --git a/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/pruner/PeriodicQueryPruner.java b/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/pruner/PeriodicQueryPruner.java
index 516690e..327154a 100644
--- a/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/pruner/PeriodicQueryPruner.java
+++ b/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/pruner/PeriodicQueryPruner.java
@@ -32,7 +32,7 @@ import org.apache.rya.indexing.pcj.fluo.app.util.PeriodicQueryUtil;
 import org.apache.rya.periodic.notification.api.BinPruner;
 import org.apache.rya.periodic.notification.api.NodeBin;
 
-import jline.internal.Preconditions;
+import com.google.common.base.Preconditions;
 
 /**
  * Implementation of {@link BinPruner} that deletes old, already processed

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/4089e706/extras/periodic.notification/tests/pom.xml
----------------------------------------------------------------------
diff --git a/extras/periodic.notification/tests/pom.xml b/extras/periodic.notification/tests/pom.xml
index 229a761..feb1f0f 100644
--- a/extras/periodic.notification/tests/pom.xml
+++ b/extras/periodic.notification/tests/pom.xml
@@ -26,6 +26,10 @@
     <dependencies>
         <dependency>
             <groupId>org.apache.rya</groupId>
+            <artifactId>rya.test.kafka</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.rya</groupId>
             <artifactId>rya.pcj.fluo.test.base</artifactId>
             <exclusions>
                 <exclusion>

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/4089e706/extras/periodic.notification/tests/src/test/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplicationIT.java
----------------------------------------------------------------------
diff --git a/extras/periodic.notification/tests/src/test/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplicationIT.java b/extras/periodic.notification/tests/src/test/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplicationIT.java
index 9109775..3b6062f 100644
--- a/extras/periodic.notification/tests/src/test/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplicationIT.java
+++ b/extras/periodic.notification/tests/src/test/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplicationIT.java
@@ -18,6 +18,9 @@
  */
 package org.apache.rya.periodic.notification.application;
 
+import static org.apache.rya.periodic.notification.application.PeriodicNotificationApplicationConfiguration.KAFKA_BOOTSTRAP_SERVERS;
+import static org.apache.rya.periodic.notification.application.PeriodicNotificationApplicationConfiguration.NOTIFICATION_TOPIC;
+
 import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStream;
@@ -60,14 +63,14 @@ import org.apache.rya.indexing.pcj.fluo.app.util.FluoQueryUtils;
 import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage;
 import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator;
 import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPeriodicQueryResultStorage;
-import org.apache.rya.kafka.base.EmbeddedKafkaInstance;
-import org.apache.rya.kafka.base.EmbeddedKafkaSingleton;
-import org.apache.rya.kafka.base.KafkaTestInstanceRule;
 import org.apache.rya.pcj.fluo.test.base.RyaExportITBase;
 import org.apache.rya.periodic.notification.notification.CommandNotification;
 import org.apache.rya.periodic.notification.registration.KafkaNotificationRegistrationClient;
 import org.apache.rya.periodic.notification.serialization.BindingSetSerDe;
 import org.apache.rya.periodic.notification.serialization.CommandNotificationSerializer;
+import org.apache.rya.test.kafka.EmbeddedKafkaInstance;
+import org.apache.rya.test.kafka.EmbeddedKafkaSingleton;
+import org.apache.rya.test.kafka.KafkaTestInstanceRule;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -85,10 +88,7 @@ import org.openrdf.query.algebra.evaluation.QueryBindingSet;
 
 import com.google.common.collect.HashMultimap;
 import com.google.common.collect.Multimap;
-import com.google.common.collect.Sets;
-
-import static org.apache.rya.periodic.notification.application.PeriodicNotificationApplicationConfiguration.NOTIFICATION_TOPIC;
-import static org.apache.rya.periodic.notification.application.PeriodicNotificationApplicationConfiguration.KAFKA_BOOTSTRAP_SERVERS;;
+import com.google.common.collect.Sets;;
 
 
 public class PeriodicNotificationApplicationIT extends RyaExportITBase {
@@ -101,62 +101,64 @@ public class PeriodicNotificationApplicationIT extends RyaExportITBase {
     private PeriodicNotificationApplicationConfiguration conf;
     private static EmbeddedKafkaInstance embeddedKafka = EmbeddedKafkaSingleton.getInstance();
     private static String bootstrapServers;
-    
+
     @Rule
     public KafkaTestInstanceRule rule = new KafkaTestInstanceRule(false);
-    
+
     @BeforeClass
     public static void initClass() {
         bootstrapServers = embeddedKafka.createBootstrapServerConfig().getProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG);
     }
-    
+
     @Before
     public void init() throws Exception {
-        String topic = rule.getKafkaTopicName();
+        final String topic = rule.getKafkaTopicName();
         rule.createTopic(topic);
-        
+
         //get user specified props and update with the embedded kafka bootstrap servers and rule generated topic
         props = getProps();
         props.setProperty(NOTIFICATION_TOPIC, topic);
         props.setProperty(KAFKA_BOOTSTRAP_SERVERS, bootstrapServers);
         conf = new PeriodicNotificationApplicationConfiguration(props);
-        
+
         //create Kafka Producer
         kafkaProps = getKafkaProperties(conf);
         producer = new KafkaProducer<>(kafkaProps, new StringSerializer(), new CommandNotificationSerializer());
-        
+
         //extract kafka specific properties from application config
         app = PeriodicNotificationApplicationFactory.getPeriodicApplication(props);
         registrar = new KafkaNotificationRegistrationClient(conf.getNotificationTopic(), producer);
     }
-    
+
     @Test
     public void periodicApplicationWithAggAndGroupByTest() throws Exception {
 
-        String sparql = "prefix function: <http://org.apache.rya/function#> " // n
+        final String sparql = "prefix function: <http://org.apache.rya/function#> " // n
                 + "prefix time: <http://www.w3.org/2006/time#> " // n
                 + "select ?type (count(?obs) as ?total) where {" // n
                 + "Filter(function:periodic(?time, 1, .25, time:minutes)) " // n
                 + "?obs <uri:hasTime> ?time. " // n
                 + "?obs <uri:hasObsType> ?type } group by ?type"; // n
-        
+
         //make data
-        int periodMult = 15;
+        final int periodMult = 15;
         final ValueFactory vf = new ValueFactoryImpl();
         final DatatypeFactory dtf = DatatypeFactory.newInstance();
         //Sleep until current time aligns nicely with period to makell
         //results more predictable
-        while(System.currentTimeMillis() % (periodMult*1000) > 500);
-        ZonedDateTime time = ZonedDateTime.now();
+        while(System.currentTimeMillis() % (periodMult*1000) > 500) {
+            ;
+        }
+        final ZonedDateTime time = ZonedDateTime.now();
 
-        ZonedDateTime zTime1 = time.minusSeconds(2*periodMult);
-        String time1 = zTime1.format(DateTimeFormatter.ISO_INSTANT);
+        final ZonedDateTime zTime1 = time.minusSeconds(2*periodMult);
+        final String time1 = zTime1.format(DateTimeFormatter.ISO_INSTANT);
 
-        ZonedDateTime zTime2 = zTime1.minusSeconds(periodMult);
-        String time2 = zTime2.format(DateTimeFormatter.ISO_INSTANT);
+        final ZonedDateTime zTime2 = zTime1.minusSeconds(periodMult);
+        final String time2 = zTime2.format(DateTimeFormatter.ISO_INSTANT);
 
-        ZonedDateTime zTime3 = zTime2.minusSeconds(periodMult);
-        String time3 = zTime3.format(DateTimeFormatter.ISO_INSTANT);
+        final ZonedDateTime zTime3 = zTime2.minusSeconds(periodMult);
+        final String time3 = zTime3.format(DateTimeFormatter.ISO_INSTANT);
 
         final Collection<Statement> statements = Sets.newHashSet(
                 vf.createStatement(vf.createURI("urn:obs_1"), vf.createURI("uri:hasTime"),
@@ -174,26 +176,26 @@ public class PeriodicNotificationApplicationIT extends RyaExportITBase {
                 vf.createStatement(vf.createURI("urn:obs_5"), vf.createURI("uri:hasTime"),
                         vf.createLiteral(dtf.newXMLGregorianCalendar(time3))),
                 vf.createStatement(vf.createURI("urn:obs_5"), vf.createURI("uri:hasObsType"), vf.createLiteral("automobile")));
-        
+
         try (FluoClient fluo = FluoClientFactory.getFluoClient(conf.getFluoAppName(), Optional.of(conf.getFluoTableName()), conf)) {
-            Connector connector = ConfigUtils.getConnector(conf);
-            PeriodicQueryResultStorage storage = new AccumuloPeriodicQueryResultStorage(connector, conf.getTablePrefix());
-            CreatePeriodicQuery periodicQuery = new CreatePeriodicQuery(fluo, storage);
-            String id = FluoQueryUtils.convertFluoQueryIdToPcjId(periodicQuery.createPeriodicQuery(sparql, registrar).getQueryId());
+            final Connector connector = ConfigUtils.getConnector(conf);
+            final PeriodicQueryResultStorage storage = new AccumuloPeriodicQueryResultStorage(connector, conf.getTablePrefix());
+            final CreatePeriodicQuery periodicQuery = new CreatePeriodicQuery(fluo, storage);
+            final String id = FluoQueryUtils.convertFluoQueryIdToPcjId(periodicQuery.createPeriodicQuery(sparql, registrar).getQueryId());
             addData(statements);
             app.start();
-           
-            Multimap<Long, BindingSet> actual = HashMultimap.create();
+
+            final Multimap<Long, BindingSet> actual = HashMultimap.create();
             try (KafkaConsumer<String, BindingSet> consumer = new KafkaConsumer<>(kafkaProps, new StringDeserializer(), new BindingSetSerDe())) {
                 consumer.subscribe(Arrays.asList(id));
-                long end = System.currentTimeMillis() + 4*periodMult*1000;
+                final long end = System.currentTimeMillis() + 4*periodMult*1000;
                 long lastBinId = 0L;
                 long binId = 0L;
-                List<Long> ids = new ArrayList<>();
+                final List<Long> ids = new ArrayList<>();
                 while (System.currentTimeMillis() < end) {
-                    ConsumerRecords<String, BindingSet> records = consumer.poll(periodMult*1000);
-                    for(ConsumerRecord<String, BindingSet> record: records){
-                        BindingSet result = record.value();
+                    final ConsumerRecords<String, BindingSet> records = consumer.poll(periodMult*1000);
+                    for(final ConsumerRecord<String, BindingSet> record: records){
+                        final BindingSet result = record.value();
                         binId = Long.parseLong(result.getBinding(IncrementalUpdateConstants.PERIODIC_BIN_ID).getValue().stringValue());
                         if(lastBinId != binId) {
                             lastBinId = binId;
@@ -202,103 +204,105 @@ public class PeriodicNotificationApplicationIT extends RyaExportITBase {
                         actual.put(binId, result);
                     }
                 }
-                
-                Map<Long, Set<BindingSet>> expected = new HashMap<>();
-                
-                Set<BindingSet> expected1 = new HashSet<>();
-                QueryBindingSet bs1 = new QueryBindingSet();
+
+                final Map<Long, Set<BindingSet>> expected = new HashMap<>();
+
+                final Set<BindingSet> expected1 = new HashSet<>();
+                final QueryBindingSet bs1 = new QueryBindingSet();
                 bs1.addBinding(IncrementalUpdateConstants.PERIODIC_BIN_ID, vf.createLiteral(ids.get(0)));
                 bs1.addBinding("total", new LiteralImpl("2", XMLSchema.INTEGER));
                 bs1.addBinding("type", vf.createLiteral("airplane"));
-                
-                QueryBindingSet bs2 = new QueryBindingSet();
+
+                final QueryBindingSet bs2 = new QueryBindingSet();
                 bs2.addBinding(IncrementalUpdateConstants.PERIODIC_BIN_ID, vf.createLiteral(ids.get(0)));
                 bs2.addBinding("total", new LiteralImpl("2", XMLSchema.INTEGER));
                 bs2.addBinding("type", vf.createLiteral("ship"));
-                
-                QueryBindingSet bs3 = new QueryBindingSet();
+
+                final QueryBindingSet bs3 = new QueryBindingSet();
                 bs3.addBinding(IncrementalUpdateConstants.PERIODIC_BIN_ID, vf.createLiteral(ids.get(0)));
                 bs3.addBinding("total", new LiteralImpl("1", XMLSchema.INTEGER));
                 bs3.addBinding("type", vf.createLiteral("automobile"));
-                
+
                 expected1.add(bs1);
                 expected1.add(bs2);
                 expected1.add(bs3);
-                
-                Set<BindingSet> expected2 = new HashSet<>();
-                QueryBindingSet bs4 = new QueryBindingSet();
+
+                final Set<BindingSet> expected2 = new HashSet<>();
+                final QueryBindingSet bs4 = new QueryBindingSet();
                 bs4.addBinding(IncrementalUpdateConstants.PERIODIC_BIN_ID, vf.createLiteral(ids.get(1)));
                 bs4.addBinding("total", new LiteralImpl("2", XMLSchema.INTEGER));
                 bs4.addBinding("type", vf.createLiteral("airplane"));
-                
-                QueryBindingSet bs5 = new QueryBindingSet();
+
+                final QueryBindingSet bs5 = new QueryBindingSet();
                 bs5.addBinding(IncrementalUpdateConstants.PERIODIC_BIN_ID, vf.createLiteral(ids.get(1)));
                 bs5.addBinding("total", new LiteralImpl("2", XMLSchema.INTEGER));
                 bs5.addBinding("type", vf.createLiteral("ship"));
-                
+
                 expected2.add(bs4);
                 expected2.add(bs5);
-                
-                Set<BindingSet> expected3 = new HashSet<>();
-                QueryBindingSet bs6 = new QueryBindingSet();
+
+                final Set<BindingSet> expected3 = new HashSet<>();
+                final QueryBindingSet bs6 = new QueryBindingSet();
                 bs6.addBinding(IncrementalUpdateConstants.PERIODIC_BIN_ID, vf.createLiteral(ids.get(2)));
                 bs6.addBinding("total", new LiteralImpl("1", XMLSchema.INTEGER));
                 bs6.addBinding("type", vf.createLiteral("ship"));
-                
-                QueryBindingSet bs7 = new QueryBindingSet();
+
+                final QueryBindingSet bs7 = new QueryBindingSet();
                 bs7.addBinding(IncrementalUpdateConstants.PERIODIC_BIN_ID, vf.createLiteral(ids.get(2)));
                 bs7.addBinding("total", new LiteralImpl("1", XMLSchema.INTEGER));
                 bs7.addBinding("type", vf.createLiteral("airplane"));
-                
+
                 expected3.add(bs6);
                 expected3.add(bs7);
-                
+
                 expected.put(ids.get(0), expected1);
                 expected.put(ids.get(1), expected2);
                 expected.put(ids.get(2), expected3);
-                
+
                 Assert.assertEquals(3, actual.asMap().size());
-                for(Long ident: ids) {
+                for(final Long ident: ids) {
                     Assert.assertEquals(expected.get(ident), actual.get(ident));
                 }
             }
-            
-            Set<BindingSet> expectedResults = new HashSet<>();
+
+            final Set<BindingSet> expectedResults = new HashSet<>();
             try (CloseableIterator<BindingSet> results = storage.listResults(id, Optional.empty())) {
                 results.forEachRemaining(x -> expectedResults.add(x));
                 Assert.assertEquals(0, expectedResults.size());
             }
         }
     }
-    
-    
+
+
     @Test
     public void periodicApplicationWithAggTest() throws Exception {
 
-        String sparql = "prefix function: <http://org.apache.rya/function#> " // n
+        final String sparql = "prefix function: <http://org.apache.rya/function#> " // n
                 + "prefix time: <http://www.w3.org/2006/time#> " // n
                 + "select (count(?obs) as ?total) where {" // n
                 + "Filter(function:periodic(?time, 1, .25, time:minutes)) " // n
                 + "?obs <uri:hasTime> ?time. " // n
                 + "?obs <uri:hasId> ?id } "; // n
-        
+
         //make data
-        int periodMult = 15;
+        final int periodMult = 15;
         final ValueFactory vf = new ValueFactoryImpl();
         final DatatypeFactory dtf = DatatypeFactory.newInstance();
         //Sleep until current time aligns nicely with period to make
         //results more predictable
-        while(System.currentTimeMillis() % (periodMult*1000) > 500);
-        ZonedDateTime time = ZonedDateTime.now();
+        while(System.currentTimeMillis() % (periodMult*1000) > 500) {
+            ;
+        }
+        final ZonedDateTime time = ZonedDateTime.now();
 
-        ZonedDateTime zTime1 = time.minusSeconds(2*periodMult);
-        String time1 = zTime1.format(DateTimeFormatter.ISO_INSTANT);
+        final ZonedDateTime zTime1 = time.minusSeconds(2*periodMult);
+        final String time1 = zTime1.format(DateTimeFormatter.ISO_INSTANT);
 
-        ZonedDateTime zTime2 = zTime1.minusSeconds(periodMult);
-        String time2 = zTime2.format(DateTimeFormatter.ISO_INSTANT);
+        final ZonedDateTime zTime2 = zTime1.minusSeconds(periodMult);
+        final String time2 = zTime2.format(DateTimeFormatter.ISO_INSTANT);
 
-        ZonedDateTime zTime3 = zTime2.minusSeconds(periodMult);
-        String time3 = zTime3.format(DateTimeFormatter.ISO_INSTANT);
+        final ZonedDateTime zTime3 = zTime2.minusSeconds(periodMult);
+        final String time3 = zTime3.format(DateTimeFormatter.ISO_INSTANT);
 
         final Collection<Statement> statements = Sets.newHashSet(
                 vf.createStatement(vf.createURI("urn:obs_1"), vf.createURI("uri:hasTime"),
@@ -310,26 +314,26 @@ public class PeriodicNotificationApplicationIT extends RyaExportITBase {
                 vf.createStatement(vf.createURI("urn:obs_3"), vf.createURI("uri:hasTime"),
                         vf.createLiteral(dtf.newXMLGregorianCalendar(time3))),
                 vf.createStatement(vf.createURI("urn:obs_3"), vf.createURI("uri:hasId"), vf.createLiteral("id_3")));
-        
+
         try (FluoClient fluo = FluoClientFactory.getFluoClient(conf.getFluoAppName(), Optional.of(conf.getFluoTableName()), conf)) {
-            Connector connector = ConfigUtils.getConnector(conf);
-            PeriodicQueryResultStorage storage = new AccumuloPeriodicQueryResultStorage(connector, conf.getTablePrefix());
-            CreatePeriodicQuery periodicQuery = new CreatePeriodicQuery(fluo, storage);
-            String id = FluoQueryUtils.convertFluoQueryIdToPcjId(periodicQuery.createPeriodicQuery(sparql, registrar).getQueryId());
+            final Connector connector = ConfigUtils.getConnector(conf);
+            final PeriodicQueryResultStorage storage = new AccumuloPeriodicQueryResultStorage(connector, conf.getTablePrefix());
+            final CreatePeriodicQuery periodicQuery = new CreatePeriodicQuery(fluo, storage);
+            final String id = FluoQueryUtils.convertFluoQueryIdToPcjId(periodicQuery.createPeriodicQuery(sparql, registrar).getQueryId());
             addData(statements);
             app.start();
-            
-            Multimap<Long, BindingSet> expected = HashMultimap.create();
+
+            final Multimap<Long, BindingSet> expected = HashMultimap.create();
             try (KafkaConsumer<String, BindingSet> consumer = new KafkaConsumer<>(kafkaProps, new StringDeserializer(), new BindingSetSerDe())) {
                 consumer.subscribe(Arrays.asList(id));
-                long end = System.currentTimeMillis() + 4*periodMult*1000;
+                final long end = System.currentTimeMillis() + 4*periodMult*1000;
                 long lastBinId = 0L;
                 long binId = 0L;
-                List<Long> ids = new ArrayList<>();
+                final List<Long> ids = new ArrayList<>();
                 while (System.currentTimeMillis() < end) {
-                    ConsumerRecords<String, BindingSet> records = consumer.poll(periodMult*1000);
-                    for(ConsumerRecord<String, BindingSet> record: records){
-                        BindingSet result = record.value();
+                    final ConsumerRecords<String, BindingSet> records = consumer.poll(periodMult*1000);
+                    for(final ConsumerRecord<String, BindingSet> record: records){
+                        final BindingSet result = record.value();
                         binId = Long.parseLong(result.getBinding(IncrementalUpdateConstants.PERIODIC_BIN_ID).getValue().stringValue());
                         if(lastBinId != binId) {
                             lastBinId = binId;
@@ -338,21 +342,21 @@ public class PeriodicNotificationApplicationIT extends RyaExportITBase {
                         expected.put(binId, result);
                     }
                 }
-                
+
                 Assert.assertEquals(3, expected.asMap().size());
                 int i = 0;
-                for(Long ident: ids) {
+                for(final Long ident: ids) {
                     Assert.assertEquals(1, expected.get(ident).size());
-                    BindingSet bs = expected.get(ident).iterator().next();
-                    Value val = bs.getValue("total");
-                    int total = Integer.parseInt(val.stringValue());
+                    final BindingSet bs = expected.get(ident).iterator().next();
+                    final Value val = bs.getValue("total");
+                    final int total = Integer.parseInt(val.stringValue());
                     Assert.assertEquals(3-i, total);
                     i++;
                 }
             }
-            
-            
-            Set<BindingSet> expectedResults = new HashSet<>();
+
+
+            final Set<BindingSet> expectedResults = new HashSet<>();
             try (CloseableIterator<BindingSet> results = storage.listResults(id, Optional.empty())) {
                 results.forEachRemaining(x -> expectedResults.add(x));
                 Assert.assertEquals(0, expectedResults.size());
@@ -360,35 +364,37 @@ public class PeriodicNotificationApplicationIT extends RyaExportITBase {
         }
 
     }
-    
-    
+
+
     @Test
     public void periodicApplicationTest() throws Exception {
 
-        String sparql = "prefix function: <http://org.apache.rya/function#> " // n
+        final String sparql = "prefix function: <http://org.apache.rya/function#> " // n
                 + "prefix time: <http://www.w3.org/2006/time#> " // n
                 + "select ?obs ?id where {" // n
                 + "Filter(function:periodic(?time, 1, .25, time:minutes)) " // n
                 + "?obs <uri:hasTime> ?time. " // n
                 + "?obs <uri:hasId> ?id } "; // n
-        
+
         //make data
-        int periodMult = 15;
+        final int periodMult = 15;
         final ValueFactory vf = new ValueFactoryImpl();
         final DatatypeFactory dtf = DatatypeFactory.newInstance();
         //Sleep until current time aligns nicely with period to make
         //results more predictable
-        while(System.currentTimeMillis() % (periodMult*1000) > 500);
-        ZonedDateTime time = ZonedDateTime.now();
+        while(System.currentTimeMillis() % (periodMult*1000) > 500) {
+            ;
+        }
+        final ZonedDateTime time = ZonedDateTime.now();
 
-        ZonedDateTime zTime1 = time.minusSeconds(2*periodMult);
-        String time1 = zTime1.format(DateTimeFormatter.ISO_INSTANT);
+        final ZonedDateTime zTime1 = time.minusSeconds(2*periodMult);
+        final String time1 = zTime1.format(DateTimeFormatter.ISO_INSTANT);
 
-        ZonedDateTime zTime2 = zTime1.minusSeconds(periodMult);
-        String time2 = zTime2.format(DateTimeFormatter.ISO_INSTANT);
+        final ZonedDateTime zTime2 = zTime1.minusSeconds(periodMult);
+        final String time2 = zTime2.format(DateTimeFormatter.ISO_INSTANT);
 
-        ZonedDateTime zTime3 = zTime2.minusSeconds(periodMult);
-        String time3 = zTime3.format(DateTimeFormatter.ISO_INSTANT);
+        final ZonedDateTime zTime3 = zTime2.minusSeconds(periodMult);
+        final String time3 = zTime3.format(DateTimeFormatter.ISO_INSTANT);
 
         final Collection<Statement> statements = Sets.newHashSet(
                 vf.createStatement(vf.createURI("urn:obs_1"), vf.createURI("uri:hasTime"),
@@ -400,26 +406,26 @@ public class PeriodicNotificationApplicationIT extends RyaExportITBase {
                 vf.createStatement(vf.createURI("urn:obs_3"), vf.createURI("uri:hasTime"),
                         vf.createLiteral(dtf.newXMLGregorianCalendar(time3))),
                 vf.createStatement(vf.createURI("urn:obs_3"), vf.createURI("uri:hasId"), vf.createLiteral("id_3")));
-        
+
         try (FluoClient fluo = FluoClientFactory.getFluoClient(conf.getFluoAppName(), Optional.of(conf.getFluoTableName()), conf)) {
-            Connector connector = ConfigUtils.getConnector(conf);
-            PeriodicQueryResultStorage storage = new AccumuloPeriodicQueryResultStorage(connector, conf.getTablePrefix());
-            CreatePeriodicQuery periodicQuery = new CreatePeriodicQuery(fluo, storage);
-            String id = FluoQueryUtils.convertFluoQueryIdToPcjId(periodicQuery.createPeriodicQuery(sparql, registrar).getQueryId());
+            final Connector connector = ConfigUtils.getConnector(conf);
+            final PeriodicQueryResultStorage storage = new AccumuloPeriodicQueryResultStorage(connector, conf.getTablePrefix());
+            final CreatePeriodicQuery periodicQuery = new CreatePeriodicQuery(fluo, storage);
+            final String id = FluoQueryUtils.convertFluoQueryIdToPcjId(periodicQuery.createPeriodicQuery(sparql, registrar).getQueryId());
             addData(statements);
             app.start();
-           
-            Multimap<Long, BindingSet> expected = HashMultimap.create();
+
+            final Multimap<Long, BindingSet> expected = HashMultimap.create();
             try (KafkaConsumer<String, BindingSet> consumer = new KafkaConsumer<>(kafkaProps, new StringDeserializer(), new BindingSetSerDe())) {
                 consumer.subscribe(Arrays.asList(id));
-                long end = System.currentTimeMillis() + 4*periodMult*1000;
+                final long end = System.currentTimeMillis() + 4*periodMult*1000;
                 long lastBinId = 0L;
                 long binId = 0L;
-                List<Long> ids = new ArrayList<>();
+                final List<Long> ids = new ArrayList<>();
                 while (System.currentTimeMillis() < end) {
-                    ConsumerRecords<String, BindingSet> records = consumer.poll(periodMult*1000);
-                    for(ConsumerRecord<String, BindingSet> record: records){
-                        BindingSet result = record.value();
+                    final ConsumerRecords<String, BindingSet> records = consumer.poll(periodMult*1000);
+                    for(final ConsumerRecord<String, BindingSet> record: records){
+                        final BindingSet result = record.value();
                         binId = Long.parseLong(result.getBinding(IncrementalUpdateConstants.PERIODIC_BIN_ID).getValue().stringValue());
                         if(lastBinId != binId) {
                             lastBinId = binId;
@@ -428,17 +434,17 @@ public class PeriodicNotificationApplicationIT extends RyaExportITBase {
                         expected.put(binId, result);
                     }
                 }
-                
+
                 Assert.assertEquals(3, expected.asMap().size());
                 int i = 0;
-                for(Long ident: ids) {
+                for(final Long ident: ids) {
                     Assert.assertEquals(3-i, expected.get(ident).size());
                     i++;
                 }
             }
-            
-            
-            Set<BindingSet> expectedResults = new HashSet<>();
+
+
+            final Set<BindingSet> expectedResults = new HashSet<>();
             try (CloseableIterator<BindingSet> results = storage.listResults(id, Optional.empty())) {
                 results.forEachRemaining(x -> expectedResults.add(x));
                 Assert.assertEquals(0, expectedResults.size());
@@ -446,40 +452,40 @@ public class PeriodicNotificationApplicationIT extends RyaExportITBase {
         }
 
     }
-    
-    
+
+
     @After
     public void shutdown() {
         registrar.close();
         app.stop();
     }
-    
-    private void addData(Collection<Statement> statements) throws DatatypeConfigurationException {
+
+    private void addData(final Collection<Statement> statements) throws DatatypeConfigurationException {
         // add statements to Fluo
         try (FluoClient fluo = new FluoClientImpl(getFluoConfiguration())) {
-            InsertTriples inserter = new InsertTriples();
+            final InsertTriples inserter = new InsertTriples();
             statements.forEach(x -> inserter.insert(fluo, RdfToRyaConversions.convertStatement(x)));
             getMiniFluo().waitForObservers();
         }
     }
 
-    private static Properties getKafkaProperties(PeriodicNotificationApplicationConfiguration conf) { 
-        Properties kafkaProps = new Properties();
+    private static Properties getKafkaProperties(final PeriodicNotificationApplicationConfiguration conf) {
+        final Properties kafkaProps = new Properties();
         kafkaProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
         kafkaProps.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, UUID.randomUUID().toString());
         kafkaProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, conf.getNotificationGroupId());
         kafkaProps.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
         return kafkaProps;
     }
-    
+
     private Properties getProps() throws IOException {
-        
-        Properties props = new Properties();
+
+        final Properties props = new Properties();
         try(InputStream in = new FileInputStream("src/test/resources/notification.properties")) {
             props.load(in);
-        } 
-        
-        FluoConfiguration fluoConf = getFluoConfiguration();
+        }
+
+        final FluoConfiguration fluoConf = getFluoConfiguration();
         props.setProperty("accumulo.user", getUsername());
         props.setProperty("accumulo.password", getPassword());
         props.setProperty("accumulo.instance", getMiniAccumuloCluster().getInstanceName());

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/4089e706/extras/periodic.notification/tests/src/test/java/org/apache/rya/periodic/notification/exporter/PeriodicNotificationExporterIT.java
----------------------------------------------------------------------
diff --git a/extras/periodic.notification/tests/src/test/java/org/apache/rya/periodic/notification/exporter/PeriodicNotificationExporterIT.java b/extras/periodic.notification/tests/src/test/java/org/apache/rya/periodic/notification/exporter/PeriodicNotificationExporterIT.java
index 874e7e2..82338b9 100644
--- a/extras/periodic.notification/tests/src/test/java/org/apache/rya/periodic/notification/exporter/PeriodicNotificationExporterIT.java
+++ b/extras/periodic.notification/tests/src/test/java/org/apache/rya/periodic/notification/exporter/PeriodicNotificationExporterIT.java
@@ -34,10 +34,10 @@ import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage;
-import org.apache.rya.kafka.base.KafkaITBase;
-import org.apache.rya.kafka.base.KafkaTestInstanceRule;
 import org.apache.rya.periodic.notification.api.BindingSetRecord;
 import org.apache.rya.periodic.notification.serialization.BindingSetSerDe;
+import org.apache.rya.test.kafka.KafkaITBase;
+import org.apache.rya.test.kafka.KafkaTestInstanceRule;
 import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/4089e706/extras/periodic.notification/tests/src/test/java/org/apache/rya/periodic/notification/registration/kafka/PeriodicCommandNotificationConsumerIT.java
----------------------------------------------------------------------
diff --git a/extras/periodic.notification/tests/src/test/java/org/apache/rya/periodic/notification/registration/kafka/PeriodicCommandNotificationConsumerIT.java b/extras/periodic.notification/tests/src/test/java/org/apache/rya/periodic/notification/registration/kafka/PeriodicCommandNotificationConsumerIT.java
index 522e69d..1fb6167 100644
--- a/extras/periodic.notification/tests/src/test/java/org/apache/rya/periodic/notification/registration/kafka/PeriodicCommandNotificationConsumerIT.java
+++ b/extras/periodic.notification/tests/src/test/java/org/apache/rya/periodic/notification/registration/kafka/PeriodicCommandNotificationConsumerIT.java
@@ -30,13 +30,13 @@ import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.log4j.BasicConfigurator;
-import org.apache.rya.kafka.base.KafkaITBase;
-import org.apache.rya.kafka.base.KafkaTestInstanceRule;
 import org.apache.rya.periodic.notification.coordinator.PeriodicNotificationCoordinatorExecutor;
 import org.apache.rya.periodic.notification.notification.CommandNotification;
 import org.apache.rya.periodic.notification.notification.TimestampedNotification;
 import org.apache.rya.periodic.notification.registration.KafkaNotificationRegistrationClient;
 import org.apache.rya.periodic.notification.serialization.CommandNotificationSerializer;
+import org.apache.rya.test.kafka.KafkaITBase;
+import org.apache.rya.test.kafka.KafkaTestInstanceRule;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Rule;
@@ -48,10 +48,10 @@ public class PeriodicCommandNotificationConsumerIT extends KafkaITBase {
     private PeriodicNotificationCoordinatorExecutor coord;
     private KafkaNotificationProvider provider;
     private String bootstrapServer;
-    
+
     @Rule
     public KafkaTestInstanceRule rule = new KafkaTestInstanceRule(false);
-    
+
     @Before
     public void init() throws Exception {
         bootstrapServer = createBootstrapServerConfig().getProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG);
@@ -62,12 +62,12 @@ public class PeriodicCommandNotificationConsumerIT extends KafkaITBase {
 
         BasicConfigurator.configure();
 
-        BlockingQueue<TimestampedNotification> notifications = new LinkedBlockingQueue<>();
-        Properties props = createKafkaConfig();
-        KafkaProducer<String, CommandNotification> producer = new KafkaProducer<>(props);
-        String topic = rule.getKafkaTopicName();
+        final BlockingQueue<TimestampedNotification> notifications = new LinkedBlockingQueue<>();
+        final Properties props = createKafkaConfig();
+        final KafkaProducer<String, CommandNotification> producer = new KafkaProducer<>(props);
+        final String topic = rule.getKafkaTopicName();
         rule.createTopic(topic);
-        
+
         registration = new KafkaNotificationRegistrationClient(topic, producer);
         coord = new PeriodicNotificationCoordinatorExecutor(1, notifications);
         provider = new KafkaNotificationProvider(topic, new StringDeserializer(), new CommandNotificationSerializer(), props, coord, 1);
@@ -80,11 +80,11 @@ public class PeriodicCommandNotificationConsumerIT extends KafkaITBase {
 
         registration.deleteNotification("1");
         Thread.sleep(2000);
-        int size = notifications.size();
+        final int size = notifications.size();
         // sleep for 2 seconds to ensure no more messages being produced
         Thread.sleep(2000);
         Assert.assertEquals(size, notifications.size());
-        
+
         tearDown();
     }
 
@@ -93,12 +93,12 @@ public class PeriodicCommandNotificationConsumerIT extends KafkaITBase {
 
         BasicConfigurator.configure();
 
-        BlockingQueue<TimestampedNotification> notifications = new LinkedBlockingQueue<>();
-        Properties props = createKafkaConfig();
-        KafkaProducer<String, CommandNotification> producer = new KafkaProducer<>(props);
-        String topic = rule.getKafkaTopicName();
+        final BlockingQueue<TimestampedNotification> notifications = new LinkedBlockingQueue<>();
+        final Properties props = createKafkaConfig();
+        final KafkaProducer<String, CommandNotification> producer = new KafkaProducer<>(props);
+        final String topic = rule.getKafkaTopicName();
         rule.createTopic(topic);
-        
+
         registration = new KafkaNotificationRegistrationClient(topic, producer);
         coord = new PeriodicNotificationCoordinatorExecutor(1, notifications);
         provider = new KafkaNotificationProvider(topic, new StringDeserializer(), new CommandNotificationSerializer(), props, coord, 1);
@@ -111,11 +111,11 @@ public class PeriodicCommandNotificationConsumerIT extends KafkaITBase {
 
         registration.deleteNotification("1");
         Thread.sleep(2000);
-        int size = notifications.size();
+        final int size = notifications.size();
         // sleep for 2 seconds to ensure no more messages being produced
         Thread.sleep(2000);
         Assert.assertEquals(size, notifications.size());
-        
+
         tearDown();
     }
 
@@ -126,7 +126,7 @@ public class PeriodicCommandNotificationConsumerIT extends KafkaITBase {
     }
 
     private Properties createKafkaConfig() {
-        Properties props = new Properties();
+        final Properties props = new Properties();
         props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
         props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
         props.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "consumer0");

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/4089e706/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/AbstractSpanBatchInformation.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/AbstractSpanBatchInformation.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/AbstractSpanBatchInformation.java
index 498dd85..4933d57 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/AbstractSpanBatchInformation.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/AbstractSpanBatchInformation.java
@@ -23,7 +23,7 @@ import java.util.Objects;
 import org.apache.fluo.api.data.Column;
 import org.apache.fluo.api.data.Span;
 
-import jline.internal.Preconditions;
+import com.google.common.base.Preconditions;
 
 /**
  * Abstract class for generating span based notifications.  A spanned notification

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/4089e706/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/JoinBatchInformation.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/JoinBatchInformation.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/JoinBatchInformation.java
index d049ff0..3354fdc 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/JoinBatchInformation.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/JoinBatchInformation.java
@@ -27,7 +27,7 @@ import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
 import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
 import org.openrdf.query.Binding;
 
-import jline.internal.Preconditions;
+import com.google.common.base.Preconditions;
 
 /**
  * This class updates join results based on parameters specified for the join's

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/4089e706/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaExportParameterBase.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaExportParameterBase.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaExportParameterBase.java
index aab3929..8686c85 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaExportParameterBase.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaExportParameterBase.java
@@ -26,7 +26,7 @@ import org.apache.fluo.api.observer.Observer;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.rya.indexing.pcj.fluo.app.export.ParametersBase;
 
-import jline.internal.Preconditions;
+import com.google.common.base.Preconditions;
 
 /**
  * Provides read/write functions to the parameters map that is passed into an

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/4089e706/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/kafka/base/EmbeddedKafkaInstance.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/kafka/base/EmbeddedKafkaInstance.java b/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/kafka/base/EmbeddedKafkaInstance.java
deleted file mode 100644
index 97d8b90..0000000
--- a/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/kafka/base/EmbeddedKafkaInstance.java
+++ /dev/null
@@ -1,143 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.rya.kafka.base;
-
-import java.nio.file.Files;
-import java.util.Properties;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.fluo.core.util.PortUtils;
-import org.apache.kafka.clients.CommonClientConfigs;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import kafka.server.KafkaConfig;
-import kafka.server.KafkaConfig$;
-import kafka.server.KafkaServer;
-import kafka.utils.MockTime;
-import kafka.utils.TestUtils;
-import kafka.utils.Time;
-import kafka.zk.EmbeddedZookeeper;
-
-/**
- * This class provides a {@link KafkaServer} and a dedicated
- * {@link EmbeddedZookeeper} server for integtration testing. Both servers use a
- * random free port, so it is necesssary to use the
- * {@link #getZookeeperConnect()} and {@link #createBootstrapServerConfig()}
- * methods to determine how to connect to them.
- *
- */
-public class EmbeddedKafkaInstance {
-
-    private static final Logger logger = LoggerFactory.getLogger(EmbeddedKafkaInstance.class);
-
-    private static final AtomicInteger KAFKA_TOPIC_COUNTER = new AtomicInteger(1);
-    private static final String IPv4_LOOPBACK = "127.0.0.1";
-    private static final String ZKHOST = IPv4_LOOPBACK;
-    private static final String BROKERHOST = IPv4_LOOPBACK;
-    private KafkaServer kafkaServer;
-    private EmbeddedZookeeper zkServer;
-    private String brokerPort;
-    private String zookeperConnect;
-
-    /**
-     * Starts the Embedded Kafka and Zookeeper Servers.
-     * @throws Exception - If an exeption occurs during startup.
-     */
-    protected void startup() throws Exception {
-        // Setup the embedded zookeeper
-        logger.info("Starting up Embedded Zookeeper...");
-        zkServer = new EmbeddedZookeeper();
-        zookeperConnect = ZKHOST + ":" + zkServer.port();
-        logger.info("Embedded Zookeeper started at: {}", zookeperConnect);
-
-        // setup Broker
-        logger.info("Starting up Embedded Kafka...");
-        brokerPort = Integer.toString(PortUtils.getRandomFreePort());
-        final Properties brokerProps = new Properties();
-        brokerProps.setProperty(KafkaConfig$.MODULE$.BrokerIdProp(), "0");
-        brokerProps.setProperty(KafkaConfig$.MODULE$.HostNameProp(), BROKERHOST);
-        brokerProps.setProperty(KafkaConfig$.MODULE$.PortProp(), brokerPort);
-        brokerProps.setProperty(KafkaConfig$.MODULE$.ZkConnectProp(), zookeperConnect);
-        brokerProps.setProperty(KafkaConfig$.MODULE$.LogDirsProp(), Files.createTempDirectory(getClass().getSimpleName() + "-").toAbsolutePath().toString());
-        final KafkaConfig config = new KafkaConfig(brokerProps);
-        final Time mock = new MockTime();
-        kafkaServer = TestUtils.createServer(config, mock);
-        logger.info("Embedded Kafka Server started at: {}:{}", BROKERHOST, brokerPort);
-    }
-
-    /**
-     * Shutdown the Embedded Kafka and Zookeeper.
-     * @throws Exception
-     */
-    protected void shutdown() throws Exception {
-        try {
-            if(kafkaServer != null) {
-                kafkaServer.shutdown();
-            }
-        } finally {
-            if(zkServer != null) {
-                zkServer.shutdown();
-            }
-        }
-    }
-
-    /**
-     * @return A new Property object containing the correct value of
-     *         {@link CommonClientConfigs#BOOTSTRAP_SERVERS_CONFIG}, for
-     *         connecting to this instance.
-     */
-    public Properties createBootstrapServerConfig() {
-        final Properties config = new Properties();
-        config.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, BROKERHOST + ":" + brokerPort);
-        return config;
-    }
-
-    /**
-     *
-     * @return The host of the Kafka Broker.
-     */
-    public String getBrokerHost() {
-        return BROKERHOST;
-    }
-
-    /**
-     *
-     * @return The port of the Kafka Broker.
-     */
-    public String getBrokerPort() {
-        return brokerPort;
-    }
-
-    /**
-     *
-     * @return The Zookeeper Connect String.
-     */
-    public String getZookeeperConnect() {
-        return zookeperConnect;
-    }
-
-    /**
-     *
-     * @return A unique Kafka topic name for this instance.
-     */
-    public String getUniqueTopicName() {
-        return "topic_" + KAFKA_TOPIC_COUNTER.getAndIncrement() + "_";
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/4089e706/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/kafka/base/EmbeddedKafkaSingleton.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/kafka/base/EmbeddedKafkaSingleton.java b/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/kafka/base/EmbeddedKafkaSingleton.java
deleted file mode 100644
index 933377b..0000000
--- a/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/kafka/base/EmbeddedKafkaSingleton.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.rya.kafka.base;
-
-import java.io.IOException;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Provides a singleton instance of an {@link EmbeddedKafkaInstance} and
- * includes a shutdown hook to ensure any open resources are closed on JVM exit.
- * <p>
- * This class is derived from MiniAccumuloSingleton.
- */
-public class EmbeddedKafkaSingleton {
-
-    public static EmbeddedKafkaInstance getInstance() {
-        return InstanceHolder.SINGLETON.instance;
-    }
-
-    private EmbeddedKafkaSingleton() {
-        // hiding implicit default constructor
-    }
-
-    private enum InstanceHolder {
-
-        SINGLETON;
-
-        private final Logger log;
-        private final EmbeddedKafkaInstance instance;
-
-        InstanceHolder() {
-            this.log = LoggerFactory.getLogger(EmbeddedKafkaInstance.class);
-            this.instance = new EmbeddedKafkaInstance();
-            try {
-                this.instance.startup();
-
-                // JUnit does not have an overall lifecycle event for tearing down
-                // this kind of resource, but shutdown hooks work alright in practice
-                // since this should only be used during testing
-
-                // The only other alternative for lifecycle management is to use a
-                // suite lifecycle to enclose the tests that need this resource.
-                // In practice this becomes unwieldy.
-
-                Runtime.getRuntime().addShutdownHook(new Thread() {
-                    @Override
-                    public void run() {
-                        try {
-                            InstanceHolder.this.instance.shutdown();
-                        } catch (final Throwable t) {
-                            // logging frameworks will likely be shut down
-                            t.printStackTrace(System.err);
-                        }
-                    }
-                });
-
-            } catch (final InterruptedException e) {
-                Thread.currentThread().interrupt();
-                log.error("Interrupted while starting EmbeddedKafkaInstance", e);
-            } catch (final IOException e) {
-                log.error("Unexpected error while starting EmbeddedKafkaInstance", e);
-            } catch (final Throwable e) {
-                // catching throwable because failure to construct an enum
-                // instance will lead to another error being thrown downstream
-                log.error("Unexpected throwable while starting EmbeddedKafkaInstance", e);
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/4089e706/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/kafka/base/KafkaITBase.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/kafka/base/KafkaITBase.java b/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/kafka/base/KafkaITBase.java
deleted file mode 100644
index da4526c..0000000
--- a/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/kafka/base/KafkaITBase.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.rya.kafka.base;
-
-import java.util.Properties;
-
-/**
- * A class intended to be extended for Kafka Integration tests.
- */
-public class KafkaITBase {
-
-    private static EmbeddedKafkaInstance embeddedKafka = EmbeddedKafkaSingleton.getInstance();
-
-    /**
-     * @return A new Property object containing the correct value for Kafka's
-     *         {@link CommonClientConfigs#BOOTSTRAP_SERVERS_CONFIG}.
-     */
-    protected Properties createBootstrapServerConfig() {
-        return embeddedKafka.createBootstrapServerConfig();
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/4089e706/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/kafka/base/KafkaTestInstanceRule.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/kafka/base/KafkaTestInstanceRule.java b/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/kafka/base/KafkaTestInstanceRule.java
deleted file mode 100644
index a9ee7b5..0000000
--- a/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/kafka/base/KafkaTestInstanceRule.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.rya.kafka.base;
-
-import java.util.Properties;
-
-import org.I0Itec.zkclient.ZkClient;
-import org.junit.rules.ExternalResource;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import kafka.admin.AdminUtils;
-import kafka.admin.RackAwareMode;
-import kafka.utils.ZKStringSerializer$;
-import kafka.utils.ZkUtils;
-
-
-/**
- * Provides a JUnit Rule for interacting with the {@link EmbeddedKafkaSingleton}.
- *
- */
-public class KafkaTestInstanceRule extends ExternalResource {
-    private static final Logger logger = LoggerFactory.getLogger(KafkaTestInstanceRule.class);
-    private static final EmbeddedKafkaInstance kafkaInstance = EmbeddedKafkaSingleton.getInstance();
-    private String kafkaTopicName;
-    private final boolean createTopic;
-
-    /**
-     * @param createTopic - If true, a topic shall be created for the value
-     *            provided by {@link #getKafkaTopicName()}. If false, no topics
-     *            shall be created.
-     */
-    public KafkaTestInstanceRule(final boolean createTopic) {
-        this.createTopic = createTopic;
-    }
-
-    /**
-     * @return A unique topic name for this test execution. If multiple topics are required by a test, use this value as
-     *         a prefix.
-     */
-    public String getKafkaTopicName() {
-        if (kafkaTopicName == null) {
-            throw new IllegalStateException("Cannot get Kafka Topic Name outside of a test execution.");
-        }
-        return kafkaTopicName;
-    }
-
-    @Override
-    protected void before() throws Throwable {
-        // Get the next kafka topic name.
-        kafkaTopicName = kafkaInstance.getUniqueTopicName();
-
-        if(createTopic) {
-            createTopic(kafkaTopicName);
-        }
-    }
-
-    @Override
-    protected void after() {
-        kafkaTopicName = null;
-    }
-
-    /**
-     * Utility method to provide additional unique topics if they are required.
-     * @param topicName - The Kafka topic to create.
-     */
-    public void createTopic(final String topicName) {
-        // Setup Kafka.
-        ZkUtils zkUtils = null;
-        try {
-            logger.info("Creating Kafka Topic: '{}'", topicName);
-            zkUtils = ZkUtils.apply(new ZkClient(kafkaInstance.getZookeeperConnect(), 30000, 30000, ZKStringSerializer$.MODULE$), false);
-            AdminUtils.createTopic(zkUtils, topicName, 1, 1, new Properties(), RackAwareMode.Disabled$.MODULE$);
-        }
-        finally {
-            if(zkUtils != null) {
-                zkUtils.close();
-            }
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/4089e706/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 92cfba8..3dc7c68 100644
--- a/pom.xml
+++ b/pom.xml
@@ -65,6 +65,7 @@ under the License.
         <module>pig</module>
         <module>sail</module>
         <module>spark</module>
+        <module>test</module>
         <module>web</module>
     </modules>
     <properties>
@@ -280,6 +281,16 @@ under the License.
                 <version>${project.version}</version>
             </dependency>
             <dependency>
+                <groupId>org.apache.rya</groupId>
+                <artifactId>rya.test.parent</artifactId>
+                <version>${project.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>org.apache.rya</groupId>
+                <artifactId>rya.test.kafka</artifactId>
+                <version>${project.version}</version>
+            </dependency>
+            <dependency>
                 <groupId>org.apache.accumulo</groupId>
                 <artifactId>accumulo-core</artifactId>
                 <version>${accumulo.version}</version>

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/4089e706/test/kafka/pom.xml
----------------------------------------------------------------------
diff --git a/test/kafka/pom.xml b/test/kafka/pom.xml
new file mode 100644
index 0000000..44773a7
--- /dev/null
+++ b/test/kafka/pom.xml
@@ -0,0 +1,81 @@
+<?xml version="1.0" encoding="utf-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+    <parent>
+        <groupId>org.apache.rya</groupId>
+        <artifactId>rya.test.parent</artifactId>
+        <version>3.2.12-incubating-SNAPSHOT</version>
+    </parent>
+    
+    <modelVersion>4.0.0</modelVersion>
+    <artifactId>rya.test.kafka</artifactId>
+    
+    <name>Apache Rya Test Kafka</name>
+    <description>
+        This module contains the Rya Test Kakfa components that help write Kafka
+        based integration tests.
+    </description>
+
+    <dependencies>
+        <!-- Kafka dependencies. -->
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka-clients</artifactId>
+        </dependency>
+        
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka-clients</artifactId>
+            <classifier>test</classifier>
+        </dependency>
+        
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka_2.11</artifactId>
+            <exclusions>
+                <exclusion>
+                    <artifactId>slf4j-log4j12</artifactId>
+                    <groupId>org.slf4j</groupId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka_2.11</artifactId>
+            <classifier>test</classifier>
+            <scope>compile</scope>
+            <exclusions>
+                <exclusion>
+                    <artifactId>slf4j-log4j12</artifactId>
+                    <groupId>org.slf4j</groupId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+    
+        <!-- Testing dependencies. -->
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>compile</scope>
+        </dependency>
+    </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/4089e706/test/kafka/src/main/java/org/apache/rya/test/kafka/EmbeddedKafkaInstance.java
----------------------------------------------------------------------
diff --git a/test/kafka/src/main/java/org/apache/rya/test/kafka/EmbeddedKafkaInstance.java b/test/kafka/src/main/java/org/apache/rya/test/kafka/EmbeddedKafkaInstance.java
new file mode 100644
index 0000000..c7c5929
--- /dev/null
+++ b/test/kafka/src/main/java/org/apache/rya/test/kafka/EmbeddedKafkaInstance.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.test.kafka;
+
+import java.nio.file.Files;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaConfig$;
+import kafka.server.KafkaServer;
+import kafka.utils.MockTime;
+import kafka.utils.TestUtils;
+import kafka.utils.Time;
+import kafka.zk.EmbeddedZookeeper;
+
+/**
+ * This class provides a {@link KafkaServer} and a dedicated
+ * {@link EmbeddedZookeeper} server for integtration testing. Both servers use a
+ * random free port, so it is necesssary to use the
+ * {@link #getZookeeperConnect()} and {@link #createBootstrapServerConfig()}
+ * methods to determine how to connect to them.
+ *
+ */
+public class EmbeddedKafkaInstance {
+
+    private static final Logger logger = LoggerFactory.getLogger(EmbeddedKafkaInstance.class);
+
+    private static final AtomicInteger KAFKA_TOPIC_COUNTER = new AtomicInteger(1);
+    private static final String IPv4_LOOPBACK = "127.0.0.1";
+    private static final String ZKHOST = IPv4_LOOPBACK;
+    private static final String BROKERHOST = IPv4_LOOPBACK;
+    private KafkaServer kafkaServer;
+    private EmbeddedZookeeper zkServer;
+    private String brokerPort;
+    private String zookeperConnect;
+
+    /**
+     * Starts the Embedded Kafka and Zookeeper Servers.
+     * @throws Exception - If an exeption occurs during startup.
+     */
+    protected void startup() throws Exception {
+        // Setup the embedded zookeeper
+        logger.info("Starting up Embedded Zookeeper...");
+        zkServer = new EmbeddedZookeeper();
+        zookeperConnect = ZKHOST + ":" + zkServer.port();
+        logger.info("Embedded Zookeeper started at: {}", zookeperConnect);
+
+        // setup Broker
+        logger.info("Starting up Embedded Kafka...");
+        brokerPort = Integer.toString(PortUtils.getRandomFreePort());
+        final Properties brokerProps = new Properties();
+        brokerProps.setProperty(KafkaConfig$.MODULE$.BrokerIdProp(), "0");
+        brokerProps.setProperty(KafkaConfig$.MODULE$.HostNameProp(), BROKERHOST);
+        brokerProps.setProperty(KafkaConfig$.MODULE$.PortProp(), brokerPort);
+        brokerProps.setProperty(KafkaConfig$.MODULE$.ZkConnectProp(), zookeperConnect);
+        brokerProps.setProperty(KafkaConfig$.MODULE$.LogDirsProp(), Files.createTempDirectory(getClass().getSimpleName() + "-").toAbsolutePath().toString());
+        final KafkaConfig config = new KafkaConfig(brokerProps);
+        final Time mock = new MockTime();
+        kafkaServer = TestUtils.createServer(config, mock);
+        logger.info("Embedded Kafka Server started at: {}:{}", BROKERHOST, brokerPort);
+    }
+
+    /**
+     * Shutdown the Embedded Kafka and Zookeeper.
+     * @throws Exception
+     */
+    protected void shutdown() throws Exception {
+        try {
+            if(kafkaServer != null) {
+                kafkaServer.shutdown();
+            }
+        } finally {
+            if(zkServer != null) {
+                zkServer.shutdown();
+            }
+        }
+    }
+
+    /**
+     * @return A new Property object containing the correct value of
+     *         {@link CommonClientConfigs#BOOTSTRAP_SERVERS_CONFIG}, for
+     *         connecting to this instance.
+     */
+    public Properties createBootstrapServerConfig() {
+        final Properties config = new Properties();
+        config.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, BROKERHOST + ":" + brokerPort);
+        return config;
+    }
+
+    /**
+     *
+     * @return The host of the Kafka Broker.
+     */
+    public String getBrokerHost() {
+        return BROKERHOST;
+    }
+
+    /**
+     *
+     * @return The port of the Kafka Broker.
+     */
+    public String getBrokerPort() {
+        return brokerPort;
+    }
+
+    /**
+     *
+     * @return The Zookeeper Connect String.
+     */
+    public String getZookeeperConnect() {
+        return zookeperConnect;
+    }
+
+    /**
+     *
+     * @return A unique Kafka topic name for this instance.
+     */
+    public String getUniqueTopicName() {
+        return "topic_" + KAFKA_TOPIC_COUNTER.getAndIncrement() + "_";
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/4089e706/test/kafka/src/main/java/org/apache/rya/test/kafka/EmbeddedKafkaSingleton.java
----------------------------------------------------------------------
diff --git a/test/kafka/src/main/java/org/apache/rya/test/kafka/EmbeddedKafkaSingleton.java b/test/kafka/src/main/java/org/apache/rya/test/kafka/EmbeddedKafkaSingleton.java
new file mode 100644
index 0000000..3a930ee
--- /dev/null
+++ b/test/kafka/src/main/java/org/apache/rya/test/kafka/EmbeddedKafkaSingleton.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.test.kafka;
+
+import java.io.IOException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Provides a singleton instance of an {@link EmbeddedKafkaInstance} and
+ * includes a shutdown hook to ensure any open resources are closed on JVM exit.
+ * <p>
+ * This class is derived from MiniAccumuloSingleton.
+ */
+public class EmbeddedKafkaSingleton {
+
+    public static EmbeddedKafkaInstance getInstance() {
+        return InstanceHolder.SINGLETON.instance;
+    }
+
+    private EmbeddedKafkaSingleton() {
+        // hiding implicit default constructor
+    }
+
+    private enum InstanceHolder {
+
+        SINGLETON;
+
+        private final Logger log;
+        private final EmbeddedKafkaInstance instance;
+
+        InstanceHolder() {
+            this.log = LoggerFactory.getLogger(EmbeddedKafkaInstance.class);
+            this.instance = new EmbeddedKafkaInstance();
+            try {
+                this.instance.startup();
+
+                // JUnit does not have an overall lifecycle event for tearing down
+                // this kind of resource, but shutdown hooks work alright in practice
+                // since this should only be used during testing
+
+                // The only other alternative for lifecycle management is to use a
+                // suite lifecycle to enclose the tests that need this resource.
+                // In practice this becomes unwieldy.
+
+                Runtime.getRuntime().addShutdownHook(new Thread() {
+                    @Override
+                    public void run() {
+                        try {
+                            InstanceHolder.this.instance.shutdown();
+                        } catch (final Throwable t) {
+                            // logging frameworks will likely be shut down
+                            t.printStackTrace(System.err);
+                        }
+                    }
+                });
+
+            } catch (final InterruptedException e) {
+                Thread.currentThread().interrupt();
+                log.error("Interrupted while starting EmbeddedKafkaInstance", e);
+            } catch (final IOException e) {
+                log.error("Unexpected error while starting EmbeddedKafkaInstance", e);
+            } catch (final Throwable e) {
+                // catching throwable because failure to construct an enum
+                // instance will lead to another error being thrown downstream
+                log.error("Unexpected throwable while starting EmbeddedKafkaInstance", e);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/4089e706/test/kafka/src/main/java/org/apache/rya/test/kafka/KafkaITBase.java
----------------------------------------------------------------------
diff --git a/test/kafka/src/main/java/org/apache/rya/test/kafka/KafkaITBase.java b/test/kafka/src/main/java/org/apache/rya/test/kafka/KafkaITBase.java
new file mode 100644
index 0000000..ddafbcb
--- /dev/null
+++ b/test/kafka/src/main/java/org/apache/rya/test/kafka/KafkaITBase.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.test.kafka;
+
+import java.util.Properties;
+
+/**
+ * A class intended to be extended for Kafka Integration tests.
+ */
+public class KafkaITBase {
+
+    private static EmbeddedKafkaInstance embeddedKafka = EmbeddedKafkaSingleton.getInstance();
+
+    /**
+     * @return A new Property object containing the correct value for Kafka's
+     *         {@link CommonClientConfigs#BOOTSTRAP_SERVERS_CONFIG}.
+     */
+    protected Properties createBootstrapServerConfig() {
+        return embeddedKafka.createBootstrapServerConfig();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/4089e706/test/kafka/src/main/java/org/apache/rya/test/kafka/KafkaTestInstanceRule.java
----------------------------------------------------------------------
diff --git a/test/kafka/src/main/java/org/apache/rya/test/kafka/KafkaTestInstanceRule.java b/test/kafka/src/main/java/org/apache/rya/test/kafka/KafkaTestInstanceRule.java
new file mode 100644
index 0000000..5fe3c88
--- /dev/null
+++ b/test/kafka/src/main/java/org/apache/rya/test/kafka/KafkaTestInstanceRule.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.test.kafka;
+
+import java.util.Properties;
+
+import org.I0Itec.zkclient.ZkClient;
+import org.junit.rules.ExternalResource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import kafka.admin.AdminUtils;
+import kafka.admin.RackAwareMode;
+import kafka.utils.ZKStringSerializer$;
+import kafka.utils.ZkUtils;
+
+
+/**
+ * Provides a JUnit Rule for interacting with the {@link EmbeddedKafkaSingleton}.
+ *
+ */
+public class KafkaTestInstanceRule extends ExternalResource {
+    private static final Logger logger = LoggerFactory.getLogger(KafkaTestInstanceRule.class);
+    private static final EmbeddedKafkaInstance kafkaInstance = EmbeddedKafkaSingleton.getInstance();
+    private String kafkaTopicName;
+    private final boolean createTopic;
+
+    /**
+     * @param createTopic - If true, a topic shall be created for the value
+     *            provided by {@link #getKafkaTopicName()}. If false, no topics
+     *            shall be created.
+     */
+    public KafkaTestInstanceRule(final boolean createTopic) {
+        this.createTopic = createTopic;
+    }
+
+    /**
+     * @return A unique topic name for this test execution. If multiple topics are required by a test, use this value as
+     *         a prefix.
+     */
+    public String getKafkaTopicName() {
+        if (kafkaTopicName == null) {
+            throw new IllegalStateException("Cannot get Kafka Topic Name outside of a test execution.");
+        }
+        return kafkaTopicName;
+    }
+
+    @Override
+    protected void before() throws Throwable {
+        // Get the next kafka topic name.
+        kafkaTopicName = kafkaInstance.getUniqueTopicName();
+
+        if(createTopic) {
+            createTopic(kafkaTopicName);
+        }
+    }
+
+    @Override
+    protected void after() {
+        kafkaTopicName = null;
+    }
+
+    /**
+     * Utility method to provide additional unique topics if they are required.
+     * @param topicName - The Kafka topic to create.
+     */
+    public void createTopic(final String topicName) {
+        // Setup Kafka.
+        ZkUtils zkUtils = null;
+        try {
+            logger.info("Creating Kafka Topic: '{}'", topicName);
+            zkUtils = ZkUtils.apply(new ZkClient(kafkaInstance.getZookeeperConnect(), 30000, 30000, ZKStringSerializer$.MODULE$), false);
+            AdminUtils.createTopic(zkUtils, topicName, 1, 1, new Properties(), RackAwareMode.Disabled$.MODULE$);
+        }
+        finally {
+            if(zkUtils != null) {
+                zkUtils.close();
+            }
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/4089e706/test/kafka/src/main/java/org/apache/rya/test/kafka/PortUtils.java
----------------------------------------------------------------------
diff --git a/test/kafka/src/main/java/org/apache/rya/test/kafka/PortUtils.java b/test/kafka/src/main/java/org/apache/rya/test/kafka/PortUtils.java
new file mode 100644
index 0000000..7dad966
--- /dev/null
+++ b/test/kafka/src/main/java/org/apache/rya/test/kafka/PortUtils.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.rya.test.kafka;
+
+import java.io.IOException;
+import java.net.ServerSocket;
+import java.util.Random;
+
+public class PortUtils {
+
+    private PortUtils() {}
+
+    public static int getRandomFreePort() {
+        final Random r = new Random();
+        int count = 0;
+
+        while (count < 13) {
+            final int port = r.nextInt((1 << 16) - 1024) + 1024;
+
+            try (ServerSocket so = new ServerSocket(port)) {
+                so.setReuseAddress(true);
+                return port;
+            } catch (final IOException e) {
+                // ignore
+            }
+
+            count++;
+        }
+
+        throw new RuntimeException("Unable to find port");
+    }
+}