You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rya.apache.org by ca...@apache.org on 2017/10/12 18:43:23 UTC
[02/11] incubator-rya git commit: RYA-402 Create Kafka reusable test
code project. Closes #242.
RYA-402 Create Kafka reusable test code project. Closes #242.
Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/4089e706
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/4089e706
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/4089e706
Branch: refs/heads/master
Commit: 4089e706ca17c54ced3723602672f1a69d2aa2be
Parents: 6dd81bd
Author: kchilton2 <ke...@gmail.com>
Authored: Tue Oct 10 18:36:25 2017 -0400
Committer: jdasch <hc...@gmail.com>
Committed: Thu Oct 12 12:51:38 2017 -0400
----------------------------------------------------------------------
.../pcj/matching/AccumuloIndexSetProvider.java | 3 +-
...dicNotificationApplicationConfiguration.java | 2 +-
.../exporter/KafkaExporterExecutor.java | 2 +-
.../KafkaPeriodicBindingSetExporter.java | 2 +-
.../notification/pruner/AccumuloBinPruner.java | 2 +-
.../pruner/PeriodicQueryPruner.java | 2 +-
extras/periodic.notification/tests/pom.xml | 4 +
.../PeriodicNotificationApplicationIT.java | 288 ++++++++++---------
.../PeriodicNotificationExporterIT.java | 4 +-
.../PeriodicCommandNotificationConsumerIT.java | 38 +--
.../app/batch/AbstractSpanBatchInformation.java | 2 +-
.../fluo/app/batch/JoinBatchInformation.java | 2 +-
.../export/kafka/KafkaExportParameterBase.java | 2 +-
.../rya/kafka/base/EmbeddedKafkaInstance.java | 143 ---------
.../rya/kafka/base/EmbeddedKafkaSingleton.java | 87 ------
.../org/apache/rya/kafka/base/KafkaITBase.java | 38 ---
.../rya/kafka/base/KafkaTestInstanceRule.java | 98 -------
pom.xml | 11 +
test/kafka/pom.xml | 81 ++++++
.../rya/test/kafka/EmbeddedKafkaInstance.java | 142 +++++++++
.../rya/test/kafka/EmbeddedKafkaSingleton.java | 87 ++++++
.../org/apache/rya/test/kafka/KafkaITBase.java | 38 +++
.../rya/test/kafka/KafkaTestInstanceRule.java | 98 +++++++
.../org/apache/rya/test/kafka/PortUtils.java | 44 +++
test/pom.xml | 39 +++
25 files changed, 721 insertions(+), 538 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/4089e706/extras/indexing/src/main/java/org/apache/rya/indexing/pcj/matching/AccumuloIndexSetProvider.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/org/apache/rya/indexing/pcj/matching/AccumuloIndexSetProvider.java b/extras/indexing/src/main/java/org/apache/rya/indexing/pcj/matching/AccumuloIndexSetProvider.java
index 1940e64..40e2c77 100644
--- a/extras/indexing/src/main/java/org/apache/rya/indexing/pcj/matching/AccumuloIndexSetProvider.java
+++ b/extras/indexing/src/main/java/org/apache/rya/indexing/pcj/matching/AccumuloIndexSetProvider.java
@@ -52,11 +52,10 @@ import org.openrdf.query.QueryEvaluationException;
import org.openrdf.query.algebra.TupleExpr;
import org.openrdf.sail.SailException;
+import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
-import jline.internal.Preconditions;
-
/**
* Implementation of {@link ExternalSetProvider} that provides {@link ExternalTupleSet}s.
* This provider uses either user specified Accumulo configuration information or user a specified
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/4089e706/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplicationConfiguration.java
----------------------------------------------------------------------
diff --git a/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplicationConfiguration.java b/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplicationConfiguration.java
index d69efe5..ff58979 100644
--- a/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplicationConfiguration.java
+++ b/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplicationConfiguration.java
@@ -22,7 +22,7 @@ import java.util.Properties;
import org.apache.rya.accumulo.AccumuloRdfConfiguration;
-import jline.internal.Preconditions;
+import com.google.common.base.Preconditions;
/**
* Configuration object for creating a {@link PeriodicNotificationApplication}.
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/4089e706/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/exporter/KafkaExporterExecutor.java
----------------------------------------------------------------------
diff --git a/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/exporter/KafkaExporterExecutor.java b/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/exporter/KafkaExporterExecutor.java
index c2e5ebf..3b639e9 100644
--- a/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/exporter/KafkaExporterExecutor.java
+++ b/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/exporter/KafkaExporterExecutor.java
@@ -32,7 +32,7 @@ import org.apache.rya.periodic.notification.api.BindingSetRecord;
import org.apache.rya.periodic.notification.api.LifeCycle;
import org.openrdf.query.BindingSet;
-import jline.internal.Preconditions;
+import com.google.common.base.Preconditions;
/**
* Executor service that runs {@link KafkaPeriodicBindingSetExporter}s.
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/4089e706/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/exporter/KafkaPeriodicBindingSetExporter.java
----------------------------------------------------------------------
diff --git a/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/exporter/KafkaPeriodicBindingSetExporter.java b/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/exporter/KafkaPeriodicBindingSetExporter.java
index 8a0322f..5397618 100644
--- a/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/exporter/KafkaPeriodicBindingSetExporter.java
+++ b/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/exporter/KafkaPeriodicBindingSetExporter.java
@@ -36,7 +36,7 @@ import org.apache.rya.periodic.notification.api.BindingSetRecordExportException;
import org.openrdf.model.Literal;
import org.openrdf.query.BindingSet;
-import jline.internal.Preconditions;
+import com.google.common.base.Preconditions;
/**
* Object that exports {@link BindingSet}s to the Kafka topic indicated by
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/4089e706/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/pruner/AccumuloBinPruner.java
----------------------------------------------------------------------
diff --git a/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/pruner/AccumuloBinPruner.java b/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/pruner/AccumuloBinPruner.java
index 4dac64c..a9403c2 100644
--- a/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/pruner/AccumuloBinPruner.java
+++ b/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/pruner/AccumuloBinPruner.java
@@ -24,7 +24,7 @@ import org.apache.rya.indexing.pcj.storage.PeriodicQueryStorageException;
import org.apache.rya.periodic.notification.api.BinPruner;
import org.apache.rya.periodic.notification.api.NodeBin;
-import jline.internal.Preconditions;
+import com.google.common.base.Preconditions;
/**
* Deletes BindingSets from time bins in the indicated PCJ table
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/4089e706/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/pruner/PeriodicQueryPruner.java
----------------------------------------------------------------------
diff --git a/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/pruner/PeriodicQueryPruner.java b/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/pruner/PeriodicQueryPruner.java
index 516690e..327154a 100644
--- a/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/pruner/PeriodicQueryPruner.java
+++ b/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/pruner/PeriodicQueryPruner.java
@@ -32,7 +32,7 @@ import org.apache.rya.indexing.pcj.fluo.app.util.PeriodicQueryUtil;
import org.apache.rya.periodic.notification.api.BinPruner;
import org.apache.rya.periodic.notification.api.NodeBin;
-import jline.internal.Preconditions;
+import com.google.common.base.Preconditions;
/**
* Implementation of {@link BinPruner} that deletes old, already processed
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/4089e706/extras/periodic.notification/tests/pom.xml
----------------------------------------------------------------------
diff --git a/extras/periodic.notification/tests/pom.xml b/extras/periodic.notification/tests/pom.xml
index 229a761..feb1f0f 100644
--- a/extras/periodic.notification/tests/pom.xml
+++ b/extras/periodic.notification/tests/pom.xml
@@ -26,6 +26,10 @@
<dependencies>
<dependency>
<groupId>org.apache.rya</groupId>
+ <artifactId>rya.test.kafka</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.rya</groupId>
<artifactId>rya.pcj.fluo.test.base</artifactId>
<exclusions>
<exclusion>
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/4089e706/extras/periodic.notification/tests/src/test/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplicationIT.java
----------------------------------------------------------------------
diff --git a/extras/periodic.notification/tests/src/test/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplicationIT.java b/extras/periodic.notification/tests/src/test/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplicationIT.java
index 9109775..3b6062f 100644
--- a/extras/periodic.notification/tests/src/test/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplicationIT.java
+++ b/extras/periodic.notification/tests/src/test/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplicationIT.java
@@ -18,6 +18,9 @@
*/
package org.apache.rya.periodic.notification.application;
+import static org.apache.rya.periodic.notification.application.PeriodicNotificationApplicationConfiguration.KAFKA_BOOTSTRAP_SERVERS;
+import static org.apache.rya.periodic.notification.application.PeriodicNotificationApplicationConfiguration.NOTIFICATION_TOPIC;
+
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
@@ -60,14 +63,14 @@ import org.apache.rya.indexing.pcj.fluo.app.util.FluoQueryUtils;
import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage;
import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator;
import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPeriodicQueryResultStorage;
-import org.apache.rya.kafka.base.EmbeddedKafkaInstance;
-import org.apache.rya.kafka.base.EmbeddedKafkaSingleton;
-import org.apache.rya.kafka.base.KafkaTestInstanceRule;
import org.apache.rya.pcj.fluo.test.base.RyaExportITBase;
import org.apache.rya.periodic.notification.notification.CommandNotification;
import org.apache.rya.periodic.notification.registration.KafkaNotificationRegistrationClient;
import org.apache.rya.periodic.notification.serialization.BindingSetSerDe;
import org.apache.rya.periodic.notification.serialization.CommandNotificationSerializer;
+import org.apache.rya.test.kafka.EmbeddedKafkaInstance;
+import org.apache.rya.test.kafka.EmbeddedKafkaSingleton;
+import org.apache.rya.test.kafka.KafkaTestInstanceRule;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -85,10 +88,7 @@ import org.openrdf.query.algebra.evaluation.QueryBindingSet;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
-import com.google.common.collect.Sets;
-
-import static org.apache.rya.periodic.notification.application.PeriodicNotificationApplicationConfiguration.NOTIFICATION_TOPIC;
-import static org.apache.rya.periodic.notification.application.PeriodicNotificationApplicationConfiguration.KAFKA_BOOTSTRAP_SERVERS;;
+import com.google.common.collect.Sets;;
public class PeriodicNotificationApplicationIT extends RyaExportITBase {
@@ -101,62 +101,64 @@ public class PeriodicNotificationApplicationIT extends RyaExportITBase {
private PeriodicNotificationApplicationConfiguration conf;
private static EmbeddedKafkaInstance embeddedKafka = EmbeddedKafkaSingleton.getInstance();
private static String bootstrapServers;
-
+
@Rule
public KafkaTestInstanceRule rule = new KafkaTestInstanceRule(false);
-
+
@BeforeClass
public static void initClass() {
bootstrapServers = embeddedKafka.createBootstrapServerConfig().getProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG);
}
-
+
@Before
public void init() throws Exception {
- String topic = rule.getKafkaTopicName();
+ final String topic = rule.getKafkaTopicName();
rule.createTopic(topic);
-
+
//get user specified props and update with the embedded kafka bootstrap servers and rule generated topic
props = getProps();
props.setProperty(NOTIFICATION_TOPIC, topic);
props.setProperty(KAFKA_BOOTSTRAP_SERVERS, bootstrapServers);
conf = new PeriodicNotificationApplicationConfiguration(props);
-
+
//create Kafka Producer
kafkaProps = getKafkaProperties(conf);
producer = new KafkaProducer<>(kafkaProps, new StringSerializer(), new CommandNotificationSerializer());
-
+
//extract kafka specific properties from application config
app = PeriodicNotificationApplicationFactory.getPeriodicApplication(props);
registrar = new KafkaNotificationRegistrationClient(conf.getNotificationTopic(), producer);
}
-
+
@Test
public void periodicApplicationWithAggAndGroupByTest() throws Exception {
- String sparql = "prefix function: <http://org.apache.rya/function#> " // n
+ final String sparql = "prefix function: <http://org.apache.rya/function#> " // n
+ "prefix time: <http://www.w3.org/2006/time#> " // n
+ "select ?type (count(?obs) as ?total) where {" // n
+ "Filter(function:periodic(?time, 1, .25, time:minutes)) " // n
+ "?obs <uri:hasTime> ?time. " // n
+ "?obs <uri:hasObsType> ?type } group by ?type"; // n
-
+
//make data
- int periodMult = 15;
+ final int periodMult = 15;
final ValueFactory vf = new ValueFactoryImpl();
final DatatypeFactory dtf = DatatypeFactory.newInstance();
//Sleep until current time aligns nicely with period to makell
//results more predictable
- while(System.currentTimeMillis() % (periodMult*1000) > 500);
- ZonedDateTime time = ZonedDateTime.now();
+ while(System.currentTimeMillis() % (periodMult*1000) > 500) {
+ ;
+ }
+ final ZonedDateTime time = ZonedDateTime.now();
- ZonedDateTime zTime1 = time.minusSeconds(2*periodMult);
- String time1 = zTime1.format(DateTimeFormatter.ISO_INSTANT);
+ final ZonedDateTime zTime1 = time.minusSeconds(2*periodMult);
+ final String time1 = zTime1.format(DateTimeFormatter.ISO_INSTANT);
- ZonedDateTime zTime2 = zTime1.minusSeconds(periodMult);
- String time2 = zTime2.format(DateTimeFormatter.ISO_INSTANT);
+ final ZonedDateTime zTime2 = zTime1.minusSeconds(periodMult);
+ final String time2 = zTime2.format(DateTimeFormatter.ISO_INSTANT);
- ZonedDateTime zTime3 = zTime2.minusSeconds(periodMult);
- String time3 = zTime3.format(DateTimeFormatter.ISO_INSTANT);
+ final ZonedDateTime zTime3 = zTime2.minusSeconds(periodMult);
+ final String time3 = zTime3.format(DateTimeFormatter.ISO_INSTANT);
final Collection<Statement> statements = Sets.newHashSet(
vf.createStatement(vf.createURI("urn:obs_1"), vf.createURI("uri:hasTime"),
@@ -174,26 +176,26 @@ public class PeriodicNotificationApplicationIT extends RyaExportITBase {
vf.createStatement(vf.createURI("urn:obs_5"), vf.createURI("uri:hasTime"),
vf.createLiteral(dtf.newXMLGregorianCalendar(time3))),
vf.createStatement(vf.createURI("urn:obs_5"), vf.createURI("uri:hasObsType"), vf.createLiteral("automobile")));
-
+
try (FluoClient fluo = FluoClientFactory.getFluoClient(conf.getFluoAppName(), Optional.of(conf.getFluoTableName()), conf)) {
- Connector connector = ConfigUtils.getConnector(conf);
- PeriodicQueryResultStorage storage = new AccumuloPeriodicQueryResultStorage(connector, conf.getTablePrefix());
- CreatePeriodicQuery periodicQuery = new CreatePeriodicQuery(fluo, storage);
- String id = FluoQueryUtils.convertFluoQueryIdToPcjId(periodicQuery.createPeriodicQuery(sparql, registrar).getQueryId());
+ final Connector connector = ConfigUtils.getConnector(conf);
+ final PeriodicQueryResultStorage storage = new AccumuloPeriodicQueryResultStorage(connector, conf.getTablePrefix());
+ final CreatePeriodicQuery periodicQuery = new CreatePeriodicQuery(fluo, storage);
+ final String id = FluoQueryUtils.convertFluoQueryIdToPcjId(periodicQuery.createPeriodicQuery(sparql, registrar).getQueryId());
addData(statements);
app.start();
-
- Multimap<Long, BindingSet> actual = HashMultimap.create();
+
+ final Multimap<Long, BindingSet> actual = HashMultimap.create();
try (KafkaConsumer<String, BindingSet> consumer = new KafkaConsumer<>(kafkaProps, new StringDeserializer(), new BindingSetSerDe())) {
consumer.subscribe(Arrays.asList(id));
- long end = System.currentTimeMillis() + 4*periodMult*1000;
+ final long end = System.currentTimeMillis() + 4*periodMult*1000;
long lastBinId = 0L;
long binId = 0L;
- List<Long> ids = new ArrayList<>();
+ final List<Long> ids = new ArrayList<>();
while (System.currentTimeMillis() < end) {
- ConsumerRecords<String, BindingSet> records = consumer.poll(periodMult*1000);
- for(ConsumerRecord<String, BindingSet> record: records){
- BindingSet result = record.value();
+ final ConsumerRecords<String, BindingSet> records = consumer.poll(periodMult*1000);
+ for(final ConsumerRecord<String, BindingSet> record: records){
+ final BindingSet result = record.value();
binId = Long.parseLong(result.getBinding(IncrementalUpdateConstants.PERIODIC_BIN_ID).getValue().stringValue());
if(lastBinId != binId) {
lastBinId = binId;
@@ -202,103 +204,105 @@ public class PeriodicNotificationApplicationIT extends RyaExportITBase {
actual.put(binId, result);
}
}
-
- Map<Long, Set<BindingSet>> expected = new HashMap<>();
-
- Set<BindingSet> expected1 = new HashSet<>();
- QueryBindingSet bs1 = new QueryBindingSet();
+
+ final Map<Long, Set<BindingSet>> expected = new HashMap<>();
+
+ final Set<BindingSet> expected1 = new HashSet<>();
+ final QueryBindingSet bs1 = new QueryBindingSet();
bs1.addBinding(IncrementalUpdateConstants.PERIODIC_BIN_ID, vf.createLiteral(ids.get(0)));
bs1.addBinding("total", new LiteralImpl("2", XMLSchema.INTEGER));
bs1.addBinding("type", vf.createLiteral("airplane"));
-
- QueryBindingSet bs2 = new QueryBindingSet();
+
+ final QueryBindingSet bs2 = new QueryBindingSet();
bs2.addBinding(IncrementalUpdateConstants.PERIODIC_BIN_ID, vf.createLiteral(ids.get(0)));
bs2.addBinding("total", new LiteralImpl("2", XMLSchema.INTEGER));
bs2.addBinding("type", vf.createLiteral("ship"));
-
- QueryBindingSet bs3 = new QueryBindingSet();
+
+ final QueryBindingSet bs3 = new QueryBindingSet();
bs3.addBinding(IncrementalUpdateConstants.PERIODIC_BIN_ID, vf.createLiteral(ids.get(0)));
bs3.addBinding("total", new LiteralImpl("1", XMLSchema.INTEGER));
bs3.addBinding("type", vf.createLiteral("automobile"));
-
+
expected1.add(bs1);
expected1.add(bs2);
expected1.add(bs3);
-
- Set<BindingSet> expected2 = new HashSet<>();
- QueryBindingSet bs4 = new QueryBindingSet();
+
+ final Set<BindingSet> expected2 = new HashSet<>();
+ final QueryBindingSet bs4 = new QueryBindingSet();
bs4.addBinding(IncrementalUpdateConstants.PERIODIC_BIN_ID, vf.createLiteral(ids.get(1)));
bs4.addBinding("total", new LiteralImpl("2", XMLSchema.INTEGER));
bs4.addBinding("type", vf.createLiteral("airplane"));
-
- QueryBindingSet bs5 = new QueryBindingSet();
+
+ final QueryBindingSet bs5 = new QueryBindingSet();
bs5.addBinding(IncrementalUpdateConstants.PERIODIC_BIN_ID, vf.createLiteral(ids.get(1)));
bs5.addBinding("total", new LiteralImpl("2", XMLSchema.INTEGER));
bs5.addBinding("type", vf.createLiteral("ship"));
-
+
expected2.add(bs4);
expected2.add(bs5);
-
- Set<BindingSet> expected3 = new HashSet<>();
- QueryBindingSet bs6 = new QueryBindingSet();
+
+ final Set<BindingSet> expected3 = new HashSet<>();
+ final QueryBindingSet bs6 = new QueryBindingSet();
bs6.addBinding(IncrementalUpdateConstants.PERIODIC_BIN_ID, vf.createLiteral(ids.get(2)));
bs6.addBinding("total", new LiteralImpl("1", XMLSchema.INTEGER));
bs6.addBinding("type", vf.createLiteral("ship"));
-
- QueryBindingSet bs7 = new QueryBindingSet();
+
+ final QueryBindingSet bs7 = new QueryBindingSet();
bs7.addBinding(IncrementalUpdateConstants.PERIODIC_BIN_ID, vf.createLiteral(ids.get(2)));
bs7.addBinding("total", new LiteralImpl("1", XMLSchema.INTEGER));
bs7.addBinding("type", vf.createLiteral("airplane"));
-
+
expected3.add(bs6);
expected3.add(bs7);
-
+
expected.put(ids.get(0), expected1);
expected.put(ids.get(1), expected2);
expected.put(ids.get(2), expected3);
-
+
Assert.assertEquals(3, actual.asMap().size());
- for(Long ident: ids) {
+ for(final Long ident: ids) {
Assert.assertEquals(expected.get(ident), actual.get(ident));
}
}
-
- Set<BindingSet> expectedResults = new HashSet<>();
+
+ final Set<BindingSet> expectedResults = new HashSet<>();
try (CloseableIterator<BindingSet> results = storage.listResults(id, Optional.empty())) {
results.forEachRemaining(x -> expectedResults.add(x));
Assert.assertEquals(0, expectedResults.size());
}
}
}
-
-
+
+
@Test
public void periodicApplicationWithAggTest() throws Exception {
- String sparql = "prefix function: <http://org.apache.rya/function#> " // n
+ final String sparql = "prefix function: <http://org.apache.rya/function#> " // n
+ "prefix time: <http://www.w3.org/2006/time#> " // n
+ "select (count(?obs) as ?total) where {" // n
+ "Filter(function:periodic(?time, 1, .25, time:minutes)) " // n
+ "?obs <uri:hasTime> ?time. " // n
+ "?obs <uri:hasId> ?id } "; // n
-
+
//make data
- int periodMult = 15;
+ final int periodMult = 15;
final ValueFactory vf = new ValueFactoryImpl();
final DatatypeFactory dtf = DatatypeFactory.newInstance();
//Sleep until current time aligns nicely with period to make
//results more predictable
- while(System.currentTimeMillis() % (periodMult*1000) > 500);
- ZonedDateTime time = ZonedDateTime.now();
+ while(System.currentTimeMillis() % (periodMult*1000) > 500) {
+ ;
+ }
+ final ZonedDateTime time = ZonedDateTime.now();
- ZonedDateTime zTime1 = time.minusSeconds(2*periodMult);
- String time1 = zTime1.format(DateTimeFormatter.ISO_INSTANT);
+ final ZonedDateTime zTime1 = time.minusSeconds(2*periodMult);
+ final String time1 = zTime1.format(DateTimeFormatter.ISO_INSTANT);
- ZonedDateTime zTime2 = zTime1.minusSeconds(periodMult);
- String time2 = zTime2.format(DateTimeFormatter.ISO_INSTANT);
+ final ZonedDateTime zTime2 = zTime1.minusSeconds(periodMult);
+ final String time2 = zTime2.format(DateTimeFormatter.ISO_INSTANT);
- ZonedDateTime zTime3 = zTime2.minusSeconds(periodMult);
- String time3 = zTime3.format(DateTimeFormatter.ISO_INSTANT);
+ final ZonedDateTime zTime3 = zTime2.minusSeconds(periodMult);
+ final String time3 = zTime3.format(DateTimeFormatter.ISO_INSTANT);
final Collection<Statement> statements = Sets.newHashSet(
vf.createStatement(vf.createURI("urn:obs_1"), vf.createURI("uri:hasTime"),
@@ -310,26 +314,26 @@ public class PeriodicNotificationApplicationIT extends RyaExportITBase {
vf.createStatement(vf.createURI("urn:obs_3"), vf.createURI("uri:hasTime"),
vf.createLiteral(dtf.newXMLGregorianCalendar(time3))),
vf.createStatement(vf.createURI("urn:obs_3"), vf.createURI("uri:hasId"), vf.createLiteral("id_3")));
-
+
try (FluoClient fluo = FluoClientFactory.getFluoClient(conf.getFluoAppName(), Optional.of(conf.getFluoTableName()), conf)) {
- Connector connector = ConfigUtils.getConnector(conf);
- PeriodicQueryResultStorage storage = new AccumuloPeriodicQueryResultStorage(connector, conf.getTablePrefix());
- CreatePeriodicQuery periodicQuery = new CreatePeriodicQuery(fluo, storage);
- String id = FluoQueryUtils.convertFluoQueryIdToPcjId(periodicQuery.createPeriodicQuery(sparql, registrar).getQueryId());
+ final Connector connector = ConfigUtils.getConnector(conf);
+ final PeriodicQueryResultStorage storage = new AccumuloPeriodicQueryResultStorage(connector, conf.getTablePrefix());
+ final CreatePeriodicQuery periodicQuery = new CreatePeriodicQuery(fluo, storage);
+ final String id = FluoQueryUtils.convertFluoQueryIdToPcjId(periodicQuery.createPeriodicQuery(sparql, registrar).getQueryId());
addData(statements);
app.start();
-
- Multimap<Long, BindingSet> expected = HashMultimap.create();
+
+ final Multimap<Long, BindingSet> expected = HashMultimap.create();
try (KafkaConsumer<String, BindingSet> consumer = new KafkaConsumer<>(kafkaProps, new StringDeserializer(), new BindingSetSerDe())) {
consumer.subscribe(Arrays.asList(id));
- long end = System.currentTimeMillis() + 4*periodMult*1000;
+ final long end = System.currentTimeMillis() + 4*periodMult*1000;
long lastBinId = 0L;
long binId = 0L;
- List<Long> ids = new ArrayList<>();
+ final List<Long> ids = new ArrayList<>();
while (System.currentTimeMillis() < end) {
- ConsumerRecords<String, BindingSet> records = consumer.poll(periodMult*1000);
- for(ConsumerRecord<String, BindingSet> record: records){
- BindingSet result = record.value();
+ final ConsumerRecords<String, BindingSet> records = consumer.poll(periodMult*1000);
+ for(final ConsumerRecord<String, BindingSet> record: records){
+ final BindingSet result = record.value();
binId = Long.parseLong(result.getBinding(IncrementalUpdateConstants.PERIODIC_BIN_ID).getValue().stringValue());
if(lastBinId != binId) {
lastBinId = binId;
@@ -338,21 +342,21 @@ public class PeriodicNotificationApplicationIT extends RyaExportITBase {
expected.put(binId, result);
}
}
-
+
Assert.assertEquals(3, expected.asMap().size());
int i = 0;
- for(Long ident: ids) {
+ for(final Long ident: ids) {
Assert.assertEquals(1, expected.get(ident).size());
- BindingSet bs = expected.get(ident).iterator().next();
- Value val = bs.getValue("total");
- int total = Integer.parseInt(val.stringValue());
+ final BindingSet bs = expected.get(ident).iterator().next();
+ final Value val = bs.getValue("total");
+ final int total = Integer.parseInt(val.stringValue());
Assert.assertEquals(3-i, total);
i++;
}
}
-
-
- Set<BindingSet> expectedResults = new HashSet<>();
+
+
+ final Set<BindingSet> expectedResults = new HashSet<>();
try (CloseableIterator<BindingSet> results = storage.listResults(id, Optional.empty())) {
results.forEachRemaining(x -> expectedResults.add(x));
Assert.assertEquals(0, expectedResults.size());
@@ -360,35 +364,37 @@ public class PeriodicNotificationApplicationIT extends RyaExportITBase {
}
}
-
-
+
+
@Test
public void periodicApplicationTest() throws Exception {
- String sparql = "prefix function: <http://org.apache.rya/function#> " // n
+ final String sparql = "prefix function: <http://org.apache.rya/function#> " // n
+ "prefix time: <http://www.w3.org/2006/time#> " // n
+ "select ?obs ?id where {" // n
+ "Filter(function:periodic(?time, 1, .25, time:minutes)) " // n
+ "?obs <uri:hasTime> ?time. " // n
+ "?obs <uri:hasId> ?id } "; // n
-
+
//make data
- int periodMult = 15;
+ final int periodMult = 15;
final ValueFactory vf = new ValueFactoryImpl();
final DatatypeFactory dtf = DatatypeFactory.newInstance();
//Sleep until current time aligns nicely with period to make
//results more predictable
- while(System.currentTimeMillis() % (periodMult*1000) > 500);
- ZonedDateTime time = ZonedDateTime.now();
+ while(System.currentTimeMillis() % (periodMult*1000) > 500) {
+ ;
+ }
+ final ZonedDateTime time = ZonedDateTime.now();
- ZonedDateTime zTime1 = time.minusSeconds(2*periodMult);
- String time1 = zTime1.format(DateTimeFormatter.ISO_INSTANT);
+ final ZonedDateTime zTime1 = time.minusSeconds(2*periodMult);
+ final String time1 = zTime1.format(DateTimeFormatter.ISO_INSTANT);
- ZonedDateTime zTime2 = zTime1.minusSeconds(periodMult);
- String time2 = zTime2.format(DateTimeFormatter.ISO_INSTANT);
+ final ZonedDateTime zTime2 = zTime1.minusSeconds(periodMult);
+ final String time2 = zTime2.format(DateTimeFormatter.ISO_INSTANT);
- ZonedDateTime zTime3 = zTime2.minusSeconds(periodMult);
- String time3 = zTime3.format(DateTimeFormatter.ISO_INSTANT);
+ final ZonedDateTime zTime3 = zTime2.minusSeconds(periodMult);
+ final String time3 = zTime3.format(DateTimeFormatter.ISO_INSTANT);
final Collection<Statement> statements = Sets.newHashSet(
vf.createStatement(vf.createURI("urn:obs_1"), vf.createURI("uri:hasTime"),
@@ -400,26 +406,26 @@ public class PeriodicNotificationApplicationIT extends RyaExportITBase {
vf.createStatement(vf.createURI("urn:obs_3"), vf.createURI("uri:hasTime"),
vf.createLiteral(dtf.newXMLGregorianCalendar(time3))),
vf.createStatement(vf.createURI("urn:obs_3"), vf.createURI("uri:hasId"), vf.createLiteral("id_3")));
-
+
try (FluoClient fluo = FluoClientFactory.getFluoClient(conf.getFluoAppName(), Optional.of(conf.getFluoTableName()), conf)) {
- Connector connector = ConfigUtils.getConnector(conf);
- PeriodicQueryResultStorage storage = new AccumuloPeriodicQueryResultStorage(connector, conf.getTablePrefix());
- CreatePeriodicQuery periodicQuery = new CreatePeriodicQuery(fluo, storage);
- String id = FluoQueryUtils.convertFluoQueryIdToPcjId(periodicQuery.createPeriodicQuery(sparql, registrar).getQueryId());
+ final Connector connector = ConfigUtils.getConnector(conf);
+ final PeriodicQueryResultStorage storage = new AccumuloPeriodicQueryResultStorage(connector, conf.getTablePrefix());
+ final CreatePeriodicQuery periodicQuery = new CreatePeriodicQuery(fluo, storage);
+ final String id = FluoQueryUtils.convertFluoQueryIdToPcjId(periodicQuery.createPeriodicQuery(sparql, registrar).getQueryId());
addData(statements);
app.start();
-
- Multimap<Long, BindingSet> expected = HashMultimap.create();
+
+ final Multimap<Long, BindingSet> expected = HashMultimap.create();
try (KafkaConsumer<String, BindingSet> consumer = new KafkaConsumer<>(kafkaProps, new StringDeserializer(), new BindingSetSerDe())) {
consumer.subscribe(Arrays.asList(id));
- long end = System.currentTimeMillis() + 4*periodMult*1000;
+ final long end = System.currentTimeMillis() + 4*periodMult*1000;
long lastBinId = 0L;
long binId = 0L;
- List<Long> ids = new ArrayList<>();
+ final List<Long> ids = new ArrayList<>();
while (System.currentTimeMillis() < end) {
- ConsumerRecords<String, BindingSet> records = consumer.poll(periodMult*1000);
- for(ConsumerRecord<String, BindingSet> record: records){
- BindingSet result = record.value();
+ final ConsumerRecords<String, BindingSet> records = consumer.poll(periodMult*1000);
+ for(final ConsumerRecord<String, BindingSet> record: records){
+ final BindingSet result = record.value();
binId = Long.parseLong(result.getBinding(IncrementalUpdateConstants.PERIODIC_BIN_ID).getValue().stringValue());
if(lastBinId != binId) {
lastBinId = binId;
@@ -428,17 +434,17 @@ public class PeriodicNotificationApplicationIT extends RyaExportITBase {
expected.put(binId, result);
}
}
-
+
Assert.assertEquals(3, expected.asMap().size());
int i = 0;
- for(Long ident: ids) {
+ for(final Long ident: ids) {
Assert.assertEquals(3-i, expected.get(ident).size());
i++;
}
}
-
-
- Set<BindingSet> expectedResults = new HashSet<>();
+
+
+ final Set<BindingSet> expectedResults = new HashSet<>();
try (CloseableIterator<BindingSet> results = storage.listResults(id, Optional.empty())) {
results.forEachRemaining(x -> expectedResults.add(x));
Assert.assertEquals(0, expectedResults.size());
@@ -446,40 +452,40 @@ public class PeriodicNotificationApplicationIT extends RyaExportITBase {
}
}
-
-
+
+
@After
public void shutdown() {
registrar.close();
app.stop();
}
-
- private void addData(Collection<Statement> statements) throws DatatypeConfigurationException {
+
+ private void addData(final Collection<Statement> statements) throws DatatypeConfigurationException {
// add statements to Fluo
try (FluoClient fluo = new FluoClientImpl(getFluoConfiguration())) {
- InsertTriples inserter = new InsertTriples();
+ final InsertTriples inserter = new InsertTriples();
statements.forEach(x -> inserter.insert(fluo, RdfToRyaConversions.convertStatement(x)));
getMiniFluo().waitForObservers();
}
}
- private static Properties getKafkaProperties(PeriodicNotificationApplicationConfiguration conf) {
- Properties kafkaProps = new Properties();
+ private static Properties getKafkaProperties(final PeriodicNotificationApplicationConfiguration conf) {
+ final Properties kafkaProps = new Properties();
kafkaProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
kafkaProps.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, UUID.randomUUID().toString());
kafkaProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, conf.getNotificationGroupId());
kafkaProps.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return kafkaProps;
}
-
+
private Properties getProps() throws IOException {
-
- Properties props = new Properties();
+
+ final Properties props = new Properties();
try(InputStream in = new FileInputStream("src/test/resources/notification.properties")) {
props.load(in);
- }
-
- FluoConfiguration fluoConf = getFluoConfiguration();
+ }
+
+ final FluoConfiguration fluoConf = getFluoConfiguration();
props.setProperty("accumulo.user", getUsername());
props.setProperty("accumulo.password", getPassword());
props.setProperty("accumulo.instance", getMiniAccumuloCluster().getInstanceName());
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/4089e706/extras/periodic.notification/tests/src/test/java/org/apache/rya/periodic/notification/exporter/PeriodicNotificationExporterIT.java
----------------------------------------------------------------------
diff --git a/extras/periodic.notification/tests/src/test/java/org/apache/rya/periodic/notification/exporter/PeriodicNotificationExporterIT.java b/extras/periodic.notification/tests/src/test/java/org/apache/rya/periodic/notification/exporter/PeriodicNotificationExporterIT.java
index 874e7e2..82338b9 100644
--- a/extras/periodic.notification/tests/src/test/java/org/apache/rya/periodic/notification/exporter/PeriodicNotificationExporterIT.java
+++ b/extras/periodic.notification/tests/src/test/java/org/apache/rya/periodic/notification/exporter/PeriodicNotificationExporterIT.java
@@ -34,10 +34,10 @@ import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage;
-import org.apache.rya.kafka.base.KafkaITBase;
-import org.apache.rya.kafka.base.KafkaTestInstanceRule;
import org.apache.rya.periodic.notification.api.BindingSetRecord;
import org.apache.rya.periodic.notification.serialization.BindingSetSerDe;
+import org.apache.rya.test.kafka.KafkaITBase;
+import org.apache.rya.test.kafka.KafkaTestInstanceRule;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/4089e706/extras/periodic.notification/tests/src/test/java/org/apache/rya/periodic/notification/registration/kafka/PeriodicCommandNotificationConsumerIT.java
----------------------------------------------------------------------
diff --git a/extras/periodic.notification/tests/src/test/java/org/apache/rya/periodic/notification/registration/kafka/PeriodicCommandNotificationConsumerIT.java b/extras/periodic.notification/tests/src/test/java/org/apache/rya/periodic/notification/registration/kafka/PeriodicCommandNotificationConsumerIT.java
index 522e69d..1fb6167 100644
--- a/extras/periodic.notification/tests/src/test/java/org/apache/rya/periodic/notification/registration/kafka/PeriodicCommandNotificationConsumerIT.java
+++ b/extras/periodic.notification/tests/src/test/java/org/apache/rya/periodic/notification/registration/kafka/PeriodicCommandNotificationConsumerIT.java
@@ -30,13 +30,13 @@ import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.log4j.BasicConfigurator;
-import org.apache.rya.kafka.base.KafkaITBase;
-import org.apache.rya.kafka.base.KafkaTestInstanceRule;
import org.apache.rya.periodic.notification.coordinator.PeriodicNotificationCoordinatorExecutor;
import org.apache.rya.periodic.notification.notification.CommandNotification;
import org.apache.rya.periodic.notification.notification.TimestampedNotification;
import org.apache.rya.periodic.notification.registration.KafkaNotificationRegistrationClient;
import org.apache.rya.periodic.notification.serialization.CommandNotificationSerializer;
+import org.apache.rya.test.kafka.KafkaITBase;
+import org.apache.rya.test.kafka.KafkaTestInstanceRule;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
@@ -48,10 +48,10 @@ public class PeriodicCommandNotificationConsumerIT extends KafkaITBase {
private PeriodicNotificationCoordinatorExecutor coord;
private KafkaNotificationProvider provider;
private String bootstrapServer;
-
+
@Rule
public KafkaTestInstanceRule rule = new KafkaTestInstanceRule(false);
-
+
@Before
public void init() throws Exception {
bootstrapServer = createBootstrapServerConfig().getProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG);
@@ -62,12 +62,12 @@ public class PeriodicCommandNotificationConsumerIT extends KafkaITBase {
BasicConfigurator.configure();
- BlockingQueue<TimestampedNotification> notifications = new LinkedBlockingQueue<>();
- Properties props = createKafkaConfig();
- KafkaProducer<String, CommandNotification> producer = new KafkaProducer<>(props);
- String topic = rule.getKafkaTopicName();
+ final BlockingQueue<TimestampedNotification> notifications = new LinkedBlockingQueue<>();
+ final Properties props = createKafkaConfig();
+ final KafkaProducer<String, CommandNotification> producer = new KafkaProducer<>(props);
+ final String topic = rule.getKafkaTopicName();
rule.createTopic(topic);
-
+
registration = new KafkaNotificationRegistrationClient(topic, producer);
coord = new PeriodicNotificationCoordinatorExecutor(1, notifications);
provider = new KafkaNotificationProvider(topic, new StringDeserializer(), new CommandNotificationSerializer(), props, coord, 1);
@@ -80,11 +80,11 @@ public class PeriodicCommandNotificationConsumerIT extends KafkaITBase {
registration.deleteNotification("1");
Thread.sleep(2000);
- int size = notifications.size();
+ final int size = notifications.size();
// sleep for 2 seconds to ensure no more messages being produced
Thread.sleep(2000);
Assert.assertEquals(size, notifications.size());
-
+
tearDown();
}
@@ -93,12 +93,12 @@ public class PeriodicCommandNotificationConsumerIT extends KafkaITBase {
BasicConfigurator.configure();
- BlockingQueue<TimestampedNotification> notifications = new LinkedBlockingQueue<>();
- Properties props = createKafkaConfig();
- KafkaProducer<String, CommandNotification> producer = new KafkaProducer<>(props);
- String topic = rule.getKafkaTopicName();
+ final BlockingQueue<TimestampedNotification> notifications = new LinkedBlockingQueue<>();
+ final Properties props = createKafkaConfig();
+ final KafkaProducer<String, CommandNotification> producer = new KafkaProducer<>(props);
+ final String topic = rule.getKafkaTopicName();
rule.createTopic(topic);
-
+
registration = new KafkaNotificationRegistrationClient(topic, producer);
coord = new PeriodicNotificationCoordinatorExecutor(1, notifications);
provider = new KafkaNotificationProvider(topic, new StringDeserializer(), new CommandNotificationSerializer(), props, coord, 1);
@@ -111,11 +111,11 @@ public class PeriodicCommandNotificationConsumerIT extends KafkaITBase {
registration.deleteNotification("1");
Thread.sleep(2000);
- int size = notifications.size();
+ final int size = notifications.size();
// sleep for 2 seconds to ensure no more messages being produced
Thread.sleep(2000);
Assert.assertEquals(size, notifications.size());
-
+
tearDown();
}
@@ -126,7 +126,7 @@ public class PeriodicCommandNotificationConsumerIT extends KafkaITBase {
}
private Properties createKafkaConfig() {
- Properties props = new Properties();
+ final Properties props = new Properties();
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
props.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "consumer0");
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/4089e706/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/AbstractSpanBatchInformation.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/AbstractSpanBatchInformation.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/AbstractSpanBatchInformation.java
index 498dd85..4933d57 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/AbstractSpanBatchInformation.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/AbstractSpanBatchInformation.java
@@ -23,7 +23,7 @@ import java.util.Objects;
import org.apache.fluo.api.data.Column;
import org.apache.fluo.api.data.Span;
-import jline.internal.Preconditions;
+import com.google.common.base.Preconditions;
/**
* Abstract class for generating span based notifications. A spanned notification
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/4089e706/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/JoinBatchInformation.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/JoinBatchInformation.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/JoinBatchInformation.java
index d049ff0..3354fdc 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/JoinBatchInformation.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/JoinBatchInformation.java
@@ -27,7 +27,7 @@ import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
import org.openrdf.query.Binding;
-import jline.internal.Preconditions;
+import com.google.common.base.Preconditions;
/**
* This class updates join results based on parameters specified for the join's
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/4089e706/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaExportParameterBase.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaExportParameterBase.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaExportParameterBase.java
index aab3929..8686c85 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaExportParameterBase.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaExportParameterBase.java
@@ -26,7 +26,7 @@ import org.apache.fluo.api.observer.Observer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.rya.indexing.pcj.fluo.app.export.ParametersBase;
-import jline.internal.Preconditions;
+import com.google.common.base.Preconditions;
/**
* Provides read/write functions to the parameters map that is passed into an
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/4089e706/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/kafka/base/EmbeddedKafkaInstance.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/kafka/base/EmbeddedKafkaInstance.java b/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/kafka/base/EmbeddedKafkaInstance.java
deleted file mode 100644
index 97d8b90..0000000
--- a/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/kafka/base/EmbeddedKafkaInstance.java
+++ /dev/null
@@ -1,143 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.rya.kafka.base;
-
-import java.nio.file.Files;
-import java.util.Properties;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.fluo.core.util.PortUtils;
-import org.apache.kafka.clients.CommonClientConfigs;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import kafka.server.KafkaConfig;
-import kafka.server.KafkaConfig$;
-import kafka.server.KafkaServer;
-import kafka.utils.MockTime;
-import kafka.utils.TestUtils;
-import kafka.utils.Time;
-import kafka.zk.EmbeddedZookeeper;
-
-/**
- * This class provides a {@link KafkaServer} and a dedicated
- * {@link EmbeddedZookeeper} server for integtration testing. Both servers use a
- * random free port, so it is necesssary to use the
- * {@link #getZookeeperConnect()} and {@link #createBootstrapServerConfig()}
- * methods to determine how to connect to them.
- *
- */
-public class EmbeddedKafkaInstance {
-
- private static final Logger logger = LoggerFactory.getLogger(EmbeddedKafkaInstance.class);
-
- private static final AtomicInteger KAFKA_TOPIC_COUNTER = new AtomicInteger(1);
- private static final String IPv4_LOOPBACK = "127.0.0.1";
- private static final String ZKHOST = IPv4_LOOPBACK;
- private static final String BROKERHOST = IPv4_LOOPBACK;
- private KafkaServer kafkaServer;
- private EmbeddedZookeeper zkServer;
- private String brokerPort;
- private String zookeperConnect;
-
- /**
- * Starts the Embedded Kafka and Zookeeper Servers.
- * @throws Exception - If an exeption occurs during startup.
- */
- protected void startup() throws Exception {
- // Setup the embedded zookeeper
- logger.info("Starting up Embedded Zookeeper...");
- zkServer = new EmbeddedZookeeper();
- zookeperConnect = ZKHOST + ":" + zkServer.port();
- logger.info("Embedded Zookeeper started at: {}", zookeperConnect);
-
- // setup Broker
- logger.info("Starting up Embedded Kafka...");
- brokerPort = Integer.toString(PortUtils.getRandomFreePort());
- final Properties brokerProps = new Properties();
- brokerProps.setProperty(KafkaConfig$.MODULE$.BrokerIdProp(), "0");
- brokerProps.setProperty(KafkaConfig$.MODULE$.HostNameProp(), BROKERHOST);
- brokerProps.setProperty(KafkaConfig$.MODULE$.PortProp(), brokerPort);
- brokerProps.setProperty(KafkaConfig$.MODULE$.ZkConnectProp(), zookeperConnect);
- brokerProps.setProperty(KafkaConfig$.MODULE$.LogDirsProp(), Files.createTempDirectory(getClass().getSimpleName() + "-").toAbsolutePath().toString());
- final KafkaConfig config = new KafkaConfig(brokerProps);
- final Time mock = new MockTime();
- kafkaServer = TestUtils.createServer(config, mock);
- logger.info("Embedded Kafka Server started at: {}:{}", BROKERHOST, brokerPort);
- }
-
- /**
- * Shutdown the Embedded Kafka and Zookeeper.
- * @throws Exception
- */
- protected void shutdown() throws Exception {
- try {
- if(kafkaServer != null) {
- kafkaServer.shutdown();
- }
- } finally {
- if(zkServer != null) {
- zkServer.shutdown();
- }
- }
- }
-
- /**
- * @return A new Property object containing the correct value of
- * {@link CommonClientConfigs#BOOTSTRAP_SERVERS_CONFIG}, for
- * connecting to this instance.
- */
- public Properties createBootstrapServerConfig() {
- final Properties config = new Properties();
- config.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, BROKERHOST + ":" + brokerPort);
- return config;
- }
-
- /**
- *
- * @return The host of the Kafka Broker.
- */
- public String getBrokerHost() {
- return BROKERHOST;
- }
-
- /**
- *
- * @return The port of the Kafka Broker.
- */
- public String getBrokerPort() {
- return brokerPort;
- }
-
- /**
- *
- * @return The Zookeeper Connect String.
- */
- public String getZookeeperConnect() {
- return zookeperConnect;
- }
-
- /**
- *
- * @return A unique Kafka topic name for this instance.
- */
- public String getUniqueTopicName() {
- return "topic_" + KAFKA_TOPIC_COUNTER.getAndIncrement() + "_";
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/4089e706/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/kafka/base/EmbeddedKafkaSingleton.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/kafka/base/EmbeddedKafkaSingleton.java b/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/kafka/base/EmbeddedKafkaSingleton.java
deleted file mode 100644
index 933377b..0000000
--- a/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/kafka/base/EmbeddedKafkaSingleton.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.rya.kafka.base;
-
-import java.io.IOException;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Provides a singleton instance of an {@link EmbeddedKafkaInstance} and
- * includes a shutdown hook to ensure any open resources are closed on JVM exit.
- * <p>
- * This class is derived from MiniAccumuloSingleton.
- */
-public class EmbeddedKafkaSingleton {
-
- public static EmbeddedKafkaInstance getInstance() {
- return InstanceHolder.SINGLETON.instance;
- }
-
- private EmbeddedKafkaSingleton() {
- // hiding implicit default constructor
- }
-
- private enum InstanceHolder {
-
- SINGLETON;
-
- private final Logger log;
- private final EmbeddedKafkaInstance instance;
-
- InstanceHolder() {
- this.log = LoggerFactory.getLogger(EmbeddedKafkaInstance.class);
- this.instance = new EmbeddedKafkaInstance();
- try {
- this.instance.startup();
-
- // JUnit does not have an overall lifecycle event for tearing down
- // this kind of resource, but shutdown hooks work alright in practice
- // since this should only be used during testing
-
- // The only other alternative for lifecycle management is to use a
- // suite lifecycle to enclose the tests that need this resource.
- // In practice this becomes unwieldy.
-
- Runtime.getRuntime().addShutdownHook(new Thread() {
- @Override
- public void run() {
- try {
- InstanceHolder.this.instance.shutdown();
- } catch (final Throwable t) {
- // logging frameworks will likely be shut down
- t.printStackTrace(System.err);
- }
- }
- });
-
- } catch (final InterruptedException e) {
- Thread.currentThread().interrupt();
- log.error("Interrupted while starting EmbeddedKafkaInstance", e);
- } catch (final IOException e) {
- log.error("Unexpected error while starting EmbeddedKafkaInstance", e);
- } catch (final Throwable e) {
- // catching throwable because failure to construct an enum
- // instance will lead to another error being thrown downstream
- log.error("Unexpected throwable while starting EmbeddedKafkaInstance", e);
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/4089e706/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/kafka/base/KafkaITBase.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/kafka/base/KafkaITBase.java b/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/kafka/base/KafkaITBase.java
deleted file mode 100644
index da4526c..0000000
--- a/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/kafka/base/KafkaITBase.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.rya.kafka.base;
-
-import java.util.Properties;
-
-/**
- * A class intended to be extended for Kafka Integration tests.
- */
-public class KafkaITBase {
-
- private static EmbeddedKafkaInstance embeddedKafka = EmbeddedKafkaSingleton.getInstance();
-
- /**
- * @return A new Property object containing the correct value for Kafka's
- * {@link CommonClientConfigs#BOOTSTRAP_SERVERS_CONFIG}.
- */
- protected Properties createBootstrapServerConfig() {
- return embeddedKafka.createBootstrapServerConfig();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/4089e706/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/kafka/base/KafkaTestInstanceRule.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/kafka/base/KafkaTestInstanceRule.java b/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/kafka/base/KafkaTestInstanceRule.java
deleted file mode 100644
index a9ee7b5..0000000
--- a/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/kafka/base/KafkaTestInstanceRule.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.rya.kafka.base;
-
-import java.util.Properties;
-
-import org.I0Itec.zkclient.ZkClient;
-import org.junit.rules.ExternalResource;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import kafka.admin.AdminUtils;
-import kafka.admin.RackAwareMode;
-import kafka.utils.ZKStringSerializer$;
-import kafka.utils.ZkUtils;
-
-
-/**
- * Provides a JUnit Rule for interacting with the {@link EmbeddedKafkaSingleton}.
- *
- */
-public class KafkaTestInstanceRule extends ExternalResource {
- private static final Logger logger = LoggerFactory.getLogger(KafkaTestInstanceRule.class);
- private static final EmbeddedKafkaInstance kafkaInstance = EmbeddedKafkaSingleton.getInstance();
- private String kafkaTopicName;
- private final boolean createTopic;
-
- /**
- * @param createTopic - If true, a topic shall be created for the value
- * provided by {@link #getKafkaTopicName()}. If false, no topics
- * shall be created.
- */
- public KafkaTestInstanceRule(final boolean createTopic) {
- this.createTopic = createTopic;
- }
-
- /**
- * @return A unique topic name for this test execution. If multiple topics are required by a test, use this value as
- * a prefix.
- */
- public String getKafkaTopicName() {
- if (kafkaTopicName == null) {
- throw new IllegalStateException("Cannot get Kafka Topic Name outside of a test execution.");
- }
- return kafkaTopicName;
- }
-
- @Override
- protected void before() throws Throwable {
- // Get the next kafka topic name.
- kafkaTopicName = kafkaInstance.getUniqueTopicName();
-
- if(createTopic) {
- createTopic(kafkaTopicName);
- }
- }
-
- @Override
- protected void after() {
- kafkaTopicName = null;
- }
-
- /**
- * Utility method to provide additional unique topics if they are required.
- * @param topicName - The Kafka topic to create.
- */
- public void createTopic(final String topicName) {
- // Setup Kafka.
- ZkUtils zkUtils = null;
- try {
- logger.info("Creating Kafka Topic: '{}'", topicName);
- zkUtils = ZkUtils.apply(new ZkClient(kafkaInstance.getZookeeperConnect(), 30000, 30000, ZKStringSerializer$.MODULE$), false);
- AdminUtils.createTopic(zkUtils, topicName, 1, 1, new Properties(), RackAwareMode.Disabled$.MODULE$);
- }
- finally {
- if(zkUtils != null) {
- zkUtils.close();
- }
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/4089e706/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 92cfba8..3dc7c68 100644
--- a/pom.xml
+++ b/pom.xml
@@ -65,6 +65,7 @@ under the License.
<module>pig</module>
<module>sail</module>
<module>spark</module>
+ <module>test</module>
<module>web</module>
</modules>
<properties>
@@ -280,6 +281,16 @@ under the License.
<version>${project.version}</version>
</dependency>
<dependency>
+ <groupId>org.apache.rya</groupId>
+ <artifactId>rya.test.parent</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.rya</groupId>
+ <artifactId>rya.test.kafka</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
<groupId>org.apache.accumulo</groupId>
<artifactId>accumulo-core</artifactId>
<version>${accumulo.version}</version>
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/4089e706/test/kafka/pom.xml
----------------------------------------------------------------------
diff --git a/test/kafka/pom.xml b/test/kafka/pom.xml
new file mode 100644
index 0000000..44773a7
--- /dev/null
+++ b/test/kafka/pom.xml
@@ -0,0 +1,81 @@
+<?xml version="1.0" encoding="utf-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+ <parent>
+ <groupId>org.apache.rya</groupId>
+ <artifactId>rya.test.parent</artifactId>
+ <version>3.2.12-incubating-SNAPSHOT</version>
+ </parent>
+
+ <modelVersion>4.0.0</modelVersion>
+ <artifactId>rya.test.kafka</artifactId>
+
+ <name>Apache Rya Test Kafka</name>
+ <description>
+ This module contains the Rya Test Kakfa components that help write Kafka
+ based integration tests.
+ </description>
+
+ <dependencies>
+ <!-- Kafka dependencies. -->
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-clients</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-clients</artifactId>
+ <classifier>test</classifier>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka_2.11</artifactId>
+ <exclusions>
+ <exclusion>
+ <artifactId>slf4j-log4j12</artifactId>
+ <groupId>org.slf4j</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka_2.11</artifactId>
+ <classifier>test</classifier>
+ <scope>compile</scope>
+ <exclusions>
+ <exclusion>
+ <artifactId>slf4j-log4j12</artifactId>
+ <groupId>org.slf4j</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <!-- Testing dependencies. -->
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>compile</scope>
+ </dependency>
+ </dependencies>
+</project>
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/4089e706/test/kafka/src/main/java/org/apache/rya/test/kafka/EmbeddedKafkaInstance.java
----------------------------------------------------------------------
diff --git a/test/kafka/src/main/java/org/apache/rya/test/kafka/EmbeddedKafkaInstance.java b/test/kafka/src/main/java/org/apache/rya/test/kafka/EmbeddedKafkaInstance.java
new file mode 100644
index 0000000..c7c5929
--- /dev/null
+++ b/test/kafka/src/main/java/org/apache/rya/test/kafka/EmbeddedKafkaInstance.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.test.kafka;
+
+import java.nio.file.Files;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaConfig$;
+import kafka.server.KafkaServer;
+import kafka.utils.MockTime;
+import kafka.utils.TestUtils;
+import kafka.utils.Time;
+import kafka.zk.EmbeddedZookeeper;
+
+/**
+ * This class provides a {@link KafkaServer} and a dedicated
+ * {@link EmbeddedZookeeper} server for integtration testing. Both servers use a
+ * random free port, so it is necesssary to use the
+ * {@link #getZookeeperConnect()} and {@link #createBootstrapServerConfig()}
+ * methods to determine how to connect to them.
+ *
+ */
+public class EmbeddedKafkaInstance {
+
+ private static final Logger logger = LoggerFactory.getLogger(EmbeddedKafkaInstance.class);
+
+ private static final AtomicInteger KAFKA_TOPIC_COUNTER = new AtomicInteger(1);
+ private static final String IPv4_LOOPBACK = "127.0.0.1";
+ private static final String ZKHOST = IPv4_LOOPBACK;
+ private static final String BROKERHOST = IPv4_LOOPBACK;
+ private KafkaServer kafkaServer;
+ private EmbeddedZookeeper zkServer;
+ private String brokerPort;
+ private String zookeperConnect;
+
+ /**
+ * Starts the Embedded Kafka and Zookeeper Servers.
+ * @throws Exception - If an exeption occurs during startup.
+ */
+ protected void startup() throws Exception {
+ // Setup the embedded zookeeper
+ logger.info("Starting up Embedded Zookeeper...");
+ zkServer = new EmbeddedZookeeper();
+ zookeperConnect = ZKHOST + ":" + zkServer.port();
+ logger.info("Embedded Zookeeper started at: {}", zookeperConnect);
+
+ // setup Broker
+ logger.info("Starting up Embedded Kafka...");
+ brokerPort = Integer.toString(PortUtils.getRandomFreePort());
+ final Properties brokerProps = new Properties();
+ brokerProps.setProperty(KafkaConfig$.MODULE$.BrokerIdProp(), "0");
+ brokerProps.setProperty(KafkaConfig$.MODULE$.HostNameProp(), BROKERHOST);
+ brokerProps.setProperty(KafkaConfig$.MODULE$.PortProp(), brokerPort);
+ brokerProps.setProperty(KafkaConfig$.MODULE$.ZkConnectProp(), zookeperConnect);
+ brokerProps.setProperty(KafkaConfig$.MODULE$.LogDirsProp(), Files.createTempDirectory(getClass().getSimpleName() + "-").toAbsolutePath().toString());
+ final KafkaConfig config = new KafkaConfig(brokerProps);
+ final Time mock = new MockTime();
+ kafkaServer = TestUtils.createServer(config, mock);
+ logger.info("Embedded Kafka Server started at: {}:{}", BROKERHOST, brokerPort);
+ }
+
+ /**
+ * Shutdown the Embedded Kafka and Zookeeper.
+ * @throws Exception
+ */
+ protected void shutdown() throws Exception {
+ try {
+ if(kafkaServer != null) {
+ kafkaServer.shutdown();
+ }
+ } finally {
+ if(zkServer != null) {
+ zkServer.shutdown();
+ }
+ }
+ }
+
+ /**
+ * @return A new Property object containing the correct value of
+ * {@link CommonClientConfigs#BOOTSTRAP_SERVERS_CONFIG}, for
+ * connecting to this instance.
+ */
+ public Properties createBootstrapServerConfig() {
+ final Properties config = new Properties();
+ config.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, BROKERHOST + ":" + brokerPort);
+ return config;
+ }
+
+ /**
+ *
+ * @return The host of the Kafka Broker.
+ */
+ public String getBrokerHost() {
+ return BROKERHOST;
+ }
+
+ /**
+ *
+ * @return The port of the Kafka Broker.
+ */
+ public String getBrokerPort() {
+ return brokerPort;
+ }
+
+ /**
+ *
+ * @return The Zookeeper Connect String.
+ */
+ public String getZookeeperConnect() {
+ return zookeperConnect;
+ }
+
+ /**
+ *
+ * @return A unique Kafka topic name for this instance.
+ */
+ public String getUniqueTopicName() {
+ return "topic_" + KAFKA_TOPIC_COUNTER.getAndIncrement() + "_";
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/4089e706/test/kafka/src/main/java/org/apache/rya/test/kafka/EmbeddedKafkaSingleton.java
----------------------------------------------------------------------
diff --git a/test/kafka/src/main/java/org/apache/rya/test/kafka/EmbeddedKafkaSingleton.java b/test/kafka/src/main/java/org/apache/rya/test/kafka/EmbeddedKafkaSingleton.java
new file mode 100644
index 0000000..3a930ee
--- /dev/null
+++ b/test/kafka/src/main/java/org/apache/rya/test/kafka/EmbeddedKafkaSingleton.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.test.kafka;
+
+import java.io.IOException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Provides a singleton instance of an {@link EmbeddedKafkaInstance} and
+ * includes a shutdown hook to ensure any open resources are closed on JVM exit.
+ * <p>
+ * This class is derived from MiniAccumuloSingleton.
+ */
+public class EmbeddedKafkaSingleton {
+
+ public static EmbeddedKafkaInstance getInstance() {
+ return InstanceHolder.SINGLETON.instance;
+ }
+
+ private EmbeddedKafkaSingleton() {
+ // hiding implicit default constructor
+ }
+
+ private enum InstanceHolder {
+
+ SINGLETON;
+
+ private final Logger log;
+ private final EmbeddedKafkaInstance instance;
+
+ InstanceHolder() {
+ this.log = LoggerFactory.getLogger(EmbeddedKafkaInstance.class);
+ this.instance = new EmbeddedKafkaInstance();
+ try {
+ this.instance.startup();
+
+ // JUnit does not have an overall lifecycle event for tearing down
+ // this kind of resource, but shutdown hooks work alright in practice
+ // since this should only be used during testing
+
+ // The only other alternative for lifecycle management is to use a
+ // suite lifecycle to enclose the tests that need this resource.
+ // In practice this becomes unwieldy.
+
+ Runtime.getRuntime().addShutdownHook(new Thread() {
+ @Override
+ public void run() {
+ try {
+ InstanceHolder.this.instance.shutdown();
+ } catch (final Throwable t) {
+ // logging frameworks will likely be shut down
+ t.printStackTrace(System.err);
+ }
+ }
+ });
+
+ } catch (final InterruptedException e) {
+ Thread.currentThread().interrupt();
+ log.error("Interrupted while starting EmbeddedKafkaInstance", e);
+ } catch (final IOException e) {
+ log.error("Unexpected error while starting EmbeddedKafkaInstance", e);
+ } catch (final Throwable e) {
+ // catching throwable because failure to construct an enum
+ // instance will lead to another error being thrown downstream
+ log.error("Unexpected throwable while starting EmbeddedKafkaInstance", e);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/4089e706/test/kafka/src/main/java/org/apache/rya/test/kafka/KafkaITBase.java
----------------------------------------------------------------------
diff --git a/test/kafka/src/main/java/org/apache/rya/test/kafka/KafkaITBase.java b/test/kafka/src/main/java/org/apache/rya/test/kafka/KafkaITBase.java
new file mode 100644
index 0000000..ddafbcb
--- /dev/null
+++ b/test/kafka/src/main/java/org/apache/rya/test/kafka/KafkaITBase.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.test.kafka;
+
+import java.util.Properties;
+
+/**
+ * A class intended to be extended for Kafka Integration tests.
+ */
+public class KafkaITBase {
+
+ private static EmbeddedKafkaInstance embeddedKafka = EmbeddedKafkaSingleton.getInstance();
+
+ /**
+ * @return A new Property object containing the correct value for Kafka's
+ * {@link CommonClientConfigs#BOOTSTRAP_SERVERS_CONFIG}.
+ */
+ protected Properties createBootstrapServerConfig() {
+ return embeddedKafka.createBootstrapServerConfig();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/4089e706/test/kafka/src/main/java/org/apache/rya/test/kafka/KafkaTestInstanceRule.java
----------------------------------------------------------------------
diff --git a/test/kafka/src/main/java/org/apache/rya/test/kafka/KafkaTestInstanceRule.java b/test/kafka/src/main/java/org/apache/rya/test/kafka/KafkaTestInstanceRule.java
new file mode 100644
index 0000000..5fe3c88
--- /dev/null
+++ b/test/kafka/src/main/java/org/apache/rya/test/kafka/KafkaTestInstanceRule.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.test.kafka;
+
+import java.util.Properties;
+
+import org.I0Itec.zkclient.ZkClient;
+import org.junit.rules.ExternalResource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import kafka.admin.AdminUtils;
+import kafka.admin.RackAwareMode;
+import kafka.utils.ZKStringSerializer$;
+import kafka.utils.ZkUtils;
+
+
+/**
+ * Provides a JUnit Rule for interacting with the {@link EmbeddedKafkaSingleton}.
+ *
+ */
+public class KafkaTestInstanceRule extends ExternalResource {
+ private static final Logger logger = LoggerFactory.getLogger(KafkaTestInstanceRule.class);
+ private static final EmbeddedKafkaInstance kafkaInstance = EmbeddedKafkaSingleton.getInstance();
+ private String kafkaTopicName;
+ private final boolean createTopic;
+
+ /**
+ * @param createTopic - If true, a topic shall be created for the value
+ * provided by {@link #getKafkaTopicName()}. If false, no topics
+ * shall be created.
+ */
+ public KafkaTestInstanceRule(final boolean createTopic) {
+ this.createTopic = createTopic;
+ }
+
+ /**
+ * @return A unique topic name for this test execution. If multiple topics are required by a test, use this value as
+ * a prefix.
+ */
+ public String getKafkaTopicName() {
+ if (kafkaTopicName == null) {
+ throw new IllegalStateException("Cannot get Kafka Topic Name outside of a test execution.");
+ }
+ return kafkaTopicName;
+ }
+
+ @Override
+ protected void before() throws Throwable {
+ // Get the next kafka topic name.
+ kafkaTopicName = kafkaInstance.getUniqueTopicName();
+
+ if(createTopic) {
+ createTopic(kafkaTopicName);
+ }
+ }
+
+ @Override
+ protected void after() {
+ kafkaTopicName = null;
+ }
+
+ /**
+ * Utility method to provide additional unique topics if they are required.
+ * @param topicName - The Kafka topic to create.
+ */
+ public void createTopic(final String topicName) {
+ // Setup Kafka.
+ ZkUtils zkUtils = null;
+ try {
+ logger.info("Creating Kafka Topic: '{}'", topicName);
+ zkUtils = ZkUtils.apply(new ZkClient(kafkaInstance.getZookeeperConnect(), 30000, 30000, ZKStringSerializer$.MODULE$), false);
+ AdminUtils.createTopic(zkUtils, topicName, 1, 1, new Properties(), RackAwareMode.Disabled$.MODULE$);
+ }
+ finally {
+ if(zkUtils != null) {
+ zkUtils.close();
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/4089e706/test/kafka/src/main/java/org/apache/rya/test/kafka/PortUtils.java
----------------------------------------------------------------------
diff --git a/test/kafka/src/main/java/org/apache/rya/test/kafka/PortUtils.java b/test/kafka/src/main/java/org/apache/rya/test/kafka/PortUtils.java
new file mode 100644
index 0000000..7dad966
--- /dev/null
+++ b/test/kafka/src/main/java/org/apache/rya/test/kafka/PortUtils.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.rya.test.kafka;
+
+import java.io.IOException;
+import java.net.ServerSocket;
+import java.util.Random;
+
+public class PortUtils {
+
+ private PortUtils() {}
+
+ public static int getRandomFreePort() {
+ final Random r = new Random();
+ int count = 0;
+
+ while (count < 13) {
+ final int port = r.nextInt((1 << 16) - 1024) + 1024;
+
+ try (ServerSocket so = new ServerSocket(port)) {
+ so.setReuseAddress(true);
+ return port;
+ } catch (final IOException e) {
+ // ignore
+ }
+
+ count++;
+ }
+
+ throw new RuntimeException("Unable to find port");
+ }
+}