You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rya.apache.org by ca...@apache.org on 2017/09/04 13:14:06 UTC

[2/4] incubator-rya git commit: RYA-319-Integration of Periodic Query with CLI. Closes #220.

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/registration/KafkaNotificationRegistrationClient.java
----------------------------------------------------------------------
diff --git a/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/registration/KafkaNotificationRegistrationClient.java b/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/registration/KafkaNotificationRegistrationClient.java
new file mode 100644
index 0000000..bb438be
--- /dev/null
+++ b/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/registration/KafkaNotificationRegistrationClient.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.periodic.notification.registration;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.rya.periodic.notification.api.Notification;
+import org.apache.rya.periodic.notification.api.PeriodicNotificationClient;
+import org.apache.rya.periodic.notification.notification.BasicNotification;
+import org.apache.rya.periodic.notification.notification.CommandNotification;
+import org.apache.rya.periodic.notification.notification.CommandNotification.Command;
+import org.apache.rya.periodic.notification.notification.PeriodicNotification;
+
+/**
+ *  Implementation of {@link PeriodicNotificaitonClient} used to register new notification
+ *  requests with the PeriodicQueryService. 
+ *
+ */
+public class KafkaNotificationRegistrationClient implements PeriodicNotificationClient {
+
+    private KafkaProducer<String, CommandNotification> producer;
+    private String topic;
+    
+    public KafkaNotificationRegistrationClient(String topic, KafkaProducer<String, CommandNotification> producer) {
+        this.topic = topic;
+        this.producer = producer;
+    }
+    
+    @Override
+    public void addNotification(PeriodicNotification notification) {
+        processNotification(new CommandNotification(Command.ADD, notification));
+
+    }
+
+    @Override
+    public void deleteNotification(BasicNotification notification) {
+        processNotification(new CommandNotification(Command.DELETE, notification));
+    }
+
+    @Override
+    public void deleteNotification(String notificationId) {
+        processNotification(new CommandNotification(Command.DELETE, new BasicNotification(notificationId)));
+    }
+
+    @Override
+    public void addNotification(String id, long period, long delay, TimeUnit unit) {
+        Notification notification = PeriodicNotification.builder().id(id).period(period).initialDelay(delay).timeUnit(unit).build();
+        processNotification(new CommandNotification(Command.ADD, notification));
+    }
+    
+   
+    private void processNotification(CommandNotification notification) {
+        producer.send(new ProducerRecord<String, CommandNotification>(topic, notification.getId(), notification));
+    }
+    
+    @Override
+    public void close() {
+        producer.close();
+    }
+    
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/serialization/BasicNotificationTypeAdapter.java
----------------------------------------------------------------------
diff --git a/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/serialization/BasicNotificationTypeAdapter.java b/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/serialization/BasicNotificationTypeAdapter.java
new file mode 100644
index 0000000..bd29d29
--- /dev/null
+++ b/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/serialization/BasicNotificationTypeAdapter.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.periodic.notification.serialization;
+
+import java.lang.reflect.Type;
+
+import org.apache.rya.periodic.notification.notification.BasicNotification;
+
+import com.google.gson.JsonDeserializationContext;
+import com.google.gson.JsonDeserializer;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParseException;
+import com.google.gson.JsonPrimitive;
+import com.google.gson.JsonSerializationContext;
+import com.google.gson.JsonSerializer;
+
+/**
+ * {@link TypeAdapter} for {@link BasicNotification}s.  Used in {@link CommandNotificationTypeAdapter} to
+ * serialize {@link CommandNotification}s.  
+ *
+ */
+public class BasicNotificationTypeAdapter implements JsonDeserializer<BasicNotification>, JsonSerializer<BasicNotification> {
+
+    @Override
+    public JsonElement serialize(BasicNotification arg0, Type arg1, JsonSerializationContext arg2) {
+        JsonObject result = new JsonObject();
+        result.add("id", new JsonPrimitive(arg0.getId()));
+        return result;
+    }
+
+    @Override
+    public BasicNotification deserialize(JsonElement arg0, Type arg1, JsonDeserializationContext arg2) throws JsonParseException {
+        JsonObject json = arg0.getAsJsonObject();
+        String id = json.get("id").getAsString();
+        return new BasicNotification(id);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/serialization/BindingSetSerDe.java
----------------------------------------------------------------------
diff --git a/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/serialization/BindingSetSerDe.java b/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/serialization/BindingSetSerDe.java
new file mode 100644
index 0000000..50180ad
--- /dev/null
+++ b/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/serialization/BindingSetSerDe.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.periodic.notification.serialization;
+
+import java.io.UnsupportedEncodingException;
+import java.util.Arrays;
+import java.util.Map;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.log4j.Logger;
+import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjSerializer;
+import org.apache.rya.indexing.pcj.storage.accumulo.BindingSetConverter.BindingSetConversionException;
+import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
+import org.openrdf.query.BindingSet;
+import org.openrdf.query.algebra.evaluation.QueryBindingSet;
+
+import com.google.common.base.Joiner;
+import com.google.common.primitives.Bytes;
+
+/**
+ * Kafka {@link Serializer} and {@link Deserializer} for producing and consuming messages
+ * from Kafka.
+ *
+ */
+public class BindingSetSerDe implements Serializer<BindingSet>, Deserializer<BindingSet> {
+
+    private static final Logger log = Logger.getLogger(BindingSetSerDe.class);
+    private static final AccumuloPcjSerializer serializer =  new AccumuloPcjSerializer();
+    private static final byte[] DELIM_BYTE = "\u0002".getBytes();
+    
+    private byte[] toBytes(BindingSet bindingSet) {
+        try {
+            return getBytes(getVarOrder(bindingSet), bindingSet);
+        } catch(Exception e) {
+            log.trace("Unable to serialize BindingSet: " + bindingSet);
+            return new byte[0];
+        }
+    }
+
+    private BindingSet fromBytes(byte[] bsBytes) {
+        try{
+        int firstIndex = Bytes.indexOf(bsBytes, DELIM_BYTE);
+        byte[] varOrderBytes = Arrays.copyOf(bsBytes, firstIndex);
+        byte[] bsBytesNoVarOrder = Arrays.copyOfRange(bsBytes, firstIndex + 1, bsBytes.length);
+        VariableOrder varOrder = new VariableOrder(new String(varOrderBytes,"UTF-8").split(";"));
+        return getBindingSet(varOrder, bsBytesNoVarOrder);
+        } catch(Exception e) {
+            log.trace("Unable to deserialize BindingSet: " + bsBytes);
+            return new QueryBindingSet();
+        }
+    }
+    
+    private VariableOrder getVarOrder(BindingSet bs) {
+        return new VariableOrder(bs.getBindingNames());
+    }
+    
+    private byte[] getBytes(VariableOrder varOrder, BindingSet bs) throws UnsupportedEncodingException, BindingSetConversionException {
+        byte[] bsBytes = serializer.convert(bs, varOrder);
+        String varOrderString = Joiner.on(";").join(varOrder.getVariableOrders());
+        byte[] varOrderBytes = varOrderString.getBytes("UTF-8");
+        return Bytes.concat(varOrderBytes, DELIM_BYTE, bsBytes);
+    }
+    
+    private BindingSet getBindingSet(VariableOrder varOrder, byte[] bsBytes) throws BindingSetConversionException {
+        return serializer.convert(bsBytes, varOrder);
+    }
+
+    @Override
+    public BindingSet deserialize(String topic, byte[] bytes) {
+        return fromBytes(bytes);
+    }
+
+    @Override
+    public void close() {
+        // Do nothing. Nothing to close.
+    }
+
+    @Override
+    public void configure(Map<String, ?> arg0, boolean arg1) {
+        // Do nothing.  Nothing to configure.
+    }
+
+    @Override
+    public byte[] serialize(String topic, BindingSet bs) {
+        return toBytes(bs);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/serialization/CommandNotificationSerializer.java
----------------------------------------------------------------------
diff --git a/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/serialization/CommandNotificationSerializer.java b/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/serialization/CommandNotificationSerializer.java
new file mode 100644
index 0000000..302e1be
--- /dev/null
+++ b/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/serialization/CommandNotificationSerializer.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.periodic.notification.serialization;
+
+import java.io.UnsupportedEncodingException;
+import java.util.Map;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.rya.periodic.notification.api.Notification;
+import org.apache.rya.periodic.notification.notification.CommandNotification;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+
+/**
+ * Kafka {@link Serializer} and {@link Deserializer} for producing and consuming {@link CommandNotification}s
+ * to and from Kafka.
+ *
+ */
+public class CommandNotificationSerializer implements Serializer<CommandNotification>, Deserializer<CommandNotification> {
+
+    private static Gson gson = new GsonBuilder()
+            .registerTypeHierarchyAdapter(Notification.class, new CommandNotificationTypeAdapter()).create();
+    private static final Logger LOG = LoggerFactory.getLogger(CommandNotificationSerializer.class);
+
+    @Override
+    public CommandNotification deserialize(String topic, byte[] bytes) {
+        String json = null;
+        try {
+            json = new String(bytes, "UTF-8");
+        } catch (UnsupportedEncodingException e) {
+            LOG.info("Unable to deserialize notification for topic: " + topic);
+        }
+        return gson.fromJson(json, CommandNotification.class);
+    }
+
+    @Override
+    public byte[] serialize(String topic, CommandNotification command) {
+        try {
+            return gson.toJson(command).getBytes("UTF-8");
+        } catch (UnsupportedEncodingException e) {
+            LOG.info("Unable to serialize notification: " + command  + "for topic: " + topic);
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public void close() {
+        // Do nothing. Nothing to close
+    }
+    
+    @Override
+    public void configure(Map<String, ?> arg0, boolean arg1) {
+        // Do nothing. Nothing to configure
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/serialization/CommandNotificationTypeAdapter.java
----------------------------------------------------------------------
diff --git a/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/serialization/CommandNotificationTypeAdapter.java b/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/serialization/CommandNotificationTypeAdapter.java
new file mode 100644
index 0000000..a9fb7e1
--- /dev/null
+++ b/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/serialization/CommandNotificationTypeAdapter.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.periodic.notification.serialization;
+
+import java.lang.reflect.Type;
+
+import org.apache.rya.periodic.notification.api.Notification;
+import org.apache.rya.periodic.notification.notification.BasicNotification;
+import org.apache.rya.periodic.notification.notification.CommandNotification;
+import org.apache.rya.periodic.notification.notification.PeriodicNotification;
+import org.apache.rya.periodic.notification.notification.CommandNotification.Command;
+
+import com.google.gson.JsonDeserializationContext;
+import com.google.gson.JsonDeserializer;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParseException;
+import com.google.gson.JsonPrimitive;
+import com.google.gson.JsonSerializationContext;
+import com.google.gson.JsonSerializer;
+
+/**
+ * {@link TypeAdapter} used to serialize and deserialize {@link CommandNotification}s.
+ * This TypeAdapter is used in {@link CommandNotificationSerializer} for producing and 
+ * consuming messages to and from Kafka.
+ *
+ */
+public class CommandNotificationTypeAdapter
+        implements JsonDeserializer<CommandNotification>, JsonSerializer<CommandNotification> {
+
+    @Override
+    public JsonElement serialize(CommandNotification arg0, Type arg1, JsonSerializationContext arg2) {
+        JsonObject result = new JsonObject();
+        result.add("command", new JsonPrimitive(arg0.getCommand().name()));
+        Notification notification = arg0.getNotification();
+        if (notification instanceof PeriodicNotification) {
+            result.add("type", new JsonPrimitive(PeriodicNotification.class.getSimpleName()));
+            PeriodicNotificationTypeAdapter adapter = new PeriodicNotificationTypeAdapter();
+            result.add("notification",
+                    adapter.serialize((PeriodicNotification) notification, PeriodicNotification.class, arg2));
+        } else if (notification instanceof BasicNotification) {
+            result.add("type", new JsonPrimitive(BasicNotification.class.getSimpleName()));
+            BasicNotificationTypeAdapter adapter = new BasicNotificationTypeAdapter();
+            result.add("notification",
+                    adapter.serialize((BasicNotification) notification, BasicNotification.class, arg2));
+        } else {
+            throw new IllegalArgumentException("Invalid notification type.");
+        }
+        return result;
+    }
+
+    @Override
+    public CommandNotification deserialize(JsonElement arg0, Type arg1, JsonDeserializationContext arg2)
+            throws JsonParseException {
+
+        JsonObject json = arg0.getAsJsonObject();
+        Command command = Command.valueOf(json.get("command").getAsString());
+        String type = json.get("type").getAsString();
+        Notification notification = null;
+        if (type.equals(PeriodicNotification.class.getSimpleName())) {
+            notification = (new PeriodicNotificationTypeAdapter()).deserialize(json.get("notification"),
+                    PeriodicNotification.class, arg2);
+        } else if (type.equals(BasicNotification.class.getSimpleName())) {
+            notification = (new BasicNotificationTypeAdapter()).deserialize(json.get("notification"),
+                    BasicNotification.class, arg2);
+        } else {
+            throw new JsonParseException("Cannot deserialize Json");
+        }
+
+        return new CommandNotification(command, notification);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/serialization/PeriodicNotificationTypeAdapter.java
----------------------------------------------------------------------
diff --git a/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/serialization/PeriodicNotificationTypeAdapter.java b/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/serialization/PeriodicNotificationTypeAdapter.java
new file mode 100644
index 0000000..fcc0ba2
--- /dev/null
+++ b/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/serialization/PeriodicNotificationTypeAdapter.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.periodic.notification.serialization;
+
+import java.lang.reflect.Type;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.rya.periodic.notification.notification.PeriodicNotification;
+import org.apache.rya.periodic.notification.notification.PeriodicNotification.Builder;
+
+import com.google.gson.JsonDeserializationContext;
+import com.google.gson.JsonDeserializer;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParseException;
+import com.google.gson.JsonPrimitive;
+import com.google.gson.JsonSerializationContext;
+import com.google.gson.JsonSerializer;
+
+/**
+ * {@link TypeAdapter} used to serialize and deserialize {@link PeriodicNotification}s.
+ * This TypeAdapter is used in {@link CommandNotificationTypeAdapter} which is used in
+ * {@link CommandNotificationSerializer} for producing and consuming messages to and from
+ * Kafka.
+ *
+ */
+public class PeriodicNotificationTypeAdapter
+        implements JsonSerializer<PeriodicNotification>, JsonDeserializer<PeriodicNotification> {
+
+    @Override
+    public PeriodicNotification deserialize(JsonElement arg0, Type arg1, JsonDeserializationContext arg2)
+            throws JsonParseException {
+
+        JsonObject json = arg0.getAsJsonObject();
+        String id = json.get("id").getAsString();
+        long period = json.get("period").getAsLong();
+        TimeUnit periodTimeUnit = TimeUnit.valueOf(json.get("timeUnit").getAsString());
+        long initialDelay = json.get("initialDelay").getAsLong();
+        Builder builder = PeriodicNotification.builder().id(id).period(period)
+                .initialDelay(initialDelay).timeUnit(periodTimeUnit);
+
+        return builder.build();
+    }
+
+    @Override
+    public JsonElement serialize(PeriodicNotification arg0, Type arg1, JsonSerializationContext arg2) {
+
+        JsonObject result = new JsonObject();
+        result.add("id", new JsonPrimitive(arg0.getId()));
+        result.add("period", new JsonPrimitive(arg0.getPeriod()));
+        result.add("initialDelay", new JsonPrimitive(arg0.getInitialDelay()));
+        result.add("timeUnit", new JsonPrimitive(arg0.getTimeUnit().name()));
+
+        return result;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.periodic.service/periodic.service.integration.tests/pom.xml
----------------------------------------------------------------------
diff --git a/extras/rya.periodic.service/periodic.service.integration.tests/pom.xml b/extras/rya.periodic.service/periodic.service.integration.tests/pom.xml
index 20a0647..402f81d 100644
--- a/extras/rya.periodic.service/periodic.service.integration.tests/pom.xml
+++ b/extras/rya.periodic.service/periodic.service.integration.tests/pom.xml
@@ -1,22 +1,14 @@
 <?xml version="1.0" encoding="utf-8"?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
+<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor 
+    license agreements. See the NOTICE file distributed with this work for additional 
+    information regarding copyright ownership. The ASF licenses this file to 
+    you under the Apache License, Version 2.0 (the "License"); you may not use 
+    this file except in compliance with the License. You may obtain a copy of 
+    the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required 
+    by applicable law or agreed to in writing, software distributed under the 
+    License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS 
+    OF ANY KIND, either express or implied. See the License for the specific 
+    language governing permissions and limitations under the License. -->
 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
     xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
     <modelVersion>4.0.0</modelVersion>
@@ -54,7 +46,6 @@ under the License.
         <dependency>
             <groupId>org.apache.rya</groupId>
             <artifactId>rya.periodic.service.notification</artifactId>
-            <version>${project.version}</version>
             <exclusions>
                 <exclusion>
                     <artifactId>logback-classic</artifactId>

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplicationIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplicationIT.java b/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplicationIT.java
index cb7557c..9109775 100644
--- a/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplicationIT.java
+++ b/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplicationIT.java
@@ -21,7 +21,6 @@ package org.apache.rya.periodic.notification.application;
 import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStream;
-import java.nio.file.Files;
 import java.time.ZonedDateTime;
 import java.time.format.DateTimeFormatter;
 import java.util.ArrayList;
@@ -34,15 +33,16 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.Properties;
 import java.util.Set;
+import java.util.UUID;
 
 import javax.xml.datatype.DatatypeConfigurationException;
 import javax.xml.datatype.DatatypeFactory;
 
-import org.I0Itec.zkclient.ZkClient;
 import org.apache.accumulo.core.client.Connector;
 import org.apache.fluo.api.client.FluoClient;
 import org.apache.fluo.api.config.FluoConfiguration;
 import org.apache.fluo.core.client.FluoClientImpl;
+import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
@@ -52,21 +52,27 @@ import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.rya.api.resolver.RdfToRyaConversions;
 import org.apache.rya.indexing.accumulo.ConfigUtils;
+import org.apache.rya.indexing.pcj.fluo.api.CreatePeriodicQuery;
 import org.apache.rya.indexing.pcj.fluo.api.InsertTriples;
 import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants;
 import org.apache.rya.indexing.pcj.fluo.app.util.FluoClientFactory;
+import org.apache.rya.indexing.pcj.fluo.app.util.FluoQueryUtils;
 import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage;
 import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator;
 import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPeriodicQueryResultStorage;
+import org.apache.rya.kafka.base.EmbeddedKafkaInstance;
+import org.apache.rya.kafka.base.EmbeddedKafkaSingleton;
+import org.apache.rya.kafka.base.KafkaTestInstanceRule;
 import org.apache.rya.pcj.fluo.test.base.RyaExportITBase;
-import org.apache.rya.periodic.notification.api.CreatePeriodicQuery;
 import org.apache.rya.periodic.notification.notification.CommandNotification;
-import org.apache.rya.periodic.notification.registration.kafka.KafkaNotificationRegistrationClient;
+import org.apache.rya.periodic.notification.registration.KafkaNotificationRegistrationClient;
 import org.apache.rya.periodic.notification.serialization.BindingSetSerDe;
 import org.apache.rya.periodic.notification.serialization.CommandNotificationSerializer;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
 import org.junit.Test;
 import org.openrdf.model.Statement;
 import org.openrdf.model.Value;
@@ -81,14 +87,9 @@ import com.google.common.collect.HashMultimap;
 import com.google.common.collect.Multimap;
 import com.google.common.collect.Sets;
 
-import kafka.server.KafkaConfig;
-import kafka.server.KafkaServer;
-import kafka.utils.MockTime;
-import kafka.utils.TestUtils;
-import kafka.utils.Time;
-import kafka.utils.ZKStringSerializer$;
-import kafka.utils.ZkUtils;
-import kafka.zk.EmbeddedZookeeper;
+import static org.apache.rya.periodic.notification.application.PeriodicNotificationApplicationConfiguration.NOTIFICATION_TOPIC;
+import static org.apache.rya.periodic.notification.application.PeriodicNotificationApplicationConfiguration.KAFKA_BOOTSTRAP_SERVERS;;
+
 
 public class PeriodicNotificationApplicationIT extends RyaExportITBase {
 
@@ -97,45 +98,38 @@ public class PeriodicNotificationApplicationIT extends RyaExportITBase {
     private KafkaProducer<String, CommandNotification> producer;
     private Properties props;
     private Properties kafkaProps;
-    PeriodicNotificationApplicationConfiguration conf;
+    private PeriodicNotificationApplicationConfiguration conf;
+    private static EmbeddedKafkaInstance embeddedKafka = EmbeddedKafkaSingleton.getInstance();
+    private static String bootstrapServers;
+    
+    @Rule
+    public KafkaTestInstanceRule rule = new KafkaTestInstanceRule(false);
     
-    private static final String ZKHOST = "127.0.0.1";
-    private static final String BROKERHOST = "127.0.0.1";
-    private static final String BROKERPORT = "9092";
-    private ZkUtils zkUtils;
-    private KafkaServer kafkaServer;
-    private EmbeddedZookeeper zkServer;
-    private ZkClient zkClient;
+    @BeforeClass
+    public static void initClass() {
+        bootstrapServers = embeddedKafka.createBootstrapServerConfig().getProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG);
+    }
     
     @Before
     public void init() throws Exception {
-        setUpKafka();
+        String topic = rule.getKafkaTopicName();
+        rule.createTopic(topic);
+        
+        //get user specified props and update with the embedded kafka bootstrap servers and rule generated topic
         props = getProps();
+        props.setProperty(NOTIFICATION_TOPIC, topic);
+        props.setProperty(KAFKA_BOOTSTRAP_SERVERS, bootstrapServers);
         conf = new PeriodicNotificationApplicationConfiguration(props);
+        
+        //create Kafka Producer
         kafkaProps = getKafkaProperties(conf);
-        app = PeriodicNotificationApplicationFactory.getPeriodicApplication(props);
         producer = new KafkaProducer<>(kafkaProps, new StringSerializer(), new CommandNotificationSerializer());
+        
+        //extract kafka specific properties from application config
+        app = PeriodicNotificationApplicationFactory.getPeriodicApplication(props);
         registrar = new KafkaNotificationRegistrationClient(conf.getNotificationTopic(), producer);
     }
     
-    private void setUpKafka() throws Exception {
-        // Setup Kafka.
-        zkServer = new EmbeddedZookeeper();
-        final String zkConnect = ZKHOST + ":" + zkServer.port();
-        zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer$.MODULE$);
-        zkUtils = ZkUtils.apply(zkClient, false);
-
-        // setup Brokersparql
-        final Properties brokerProps = new Properties();
-        brokerProps.setProperty("zookeeper.connect", zkConnect);
-        brokerProps.setProperty("broker.id", "0");
-        brokerProps.setProperty("log.dirs", Files.createTempDirectory("kafka-").toAbsolutePath().toString());
-        brokerProps.setProperty("listeners", "PLAINTEXT://" + BROKERHOST + ":" + BROKERPORT);
-        final KafkaConfig config = new KafkaConfig(brokerProps);
-        final Time mock = new MockTime();
-        kafkaServer = TestUtils.createServer(config, mock);
-    }
-    
     @Test
     public void periodicApplicationWithAggAndGroupByTest() throws Exception {
 
@@ -185,10 +179,10 @@ public class PeriodicNotificationApplicationIT extends RyaExportITBase {
             Connector connector = ConfigUtils.getConnector(conf);
             PeriodicQueryResultStorage storage = new AccumuloPeriodicQueryResultStorage(connector, conf.getTablePrefix());
             CreatePeriodicQuery periodicQuery = new CreatePeriodicQuery(fluo, storage);
-            String id = periodicQuery.createQueryAndRegisterWithKafka(sparql, registrar);
+            String id = FluoQueryUtils.convertFluoQueryIdToPcjId(periodicQuery.createPeriodicQuery(sparql, registrar).getQueryId());
             addData(statements);
             app.start();
-//            
+           
             Multimap<Long, BindingSet> actual = HashMultimap.create();
             try (KafkaConsumer<String, BindingSet> consumer = new KafkaConsumer<>(kafkaProps, new StringDeserializer(), new BindingSetSerDe())) {
                 consumer.subscribe(Arrays.asList(id));
@@ -321,10 +315,10 @@ public class PeriodicNotificationApplicationIT extends RyaExportITBase {
             Connector connector = ConfigUtils.getConnector(conf);
             PeriodicQueryResultStorage storage = new AccumuloPeriodicQueryResultStorage(connector, conf.getTablePrefix());
             CreatePeriodicQuery periodicQuery = new CreatePeriodicQuery(fluo, storage);
-            String id = periodicQuery.createQueryAndRegisterWithKafka(sparql, registrar);
+            String id = FluoQueryUtils.convertFluoQueryIdToPcjId(periodicQuery.createPeriodicQuery(sparql, registrar).getQueryId());
             addData(statements);
             app.start();
-//            
+            
             Multimap<Long, BindingSet> expected = HashMultimap.create();
             try (KafkaConsumer<String, BindingSet> consumer = new KafkaConsumer<>(kafkaProps, new StringDeserializer(), new BindingSetSerDe())) {
                 consumer.subscribe(Arrays.asList(id));
@@ -411,10 +405,10 @@ public class PeriodicNotificationApplicationIT extends RyaExportITBase {
             Connector connector = ConfigUtils.getConnector(conf);
             PeriodicQueryResultStorage storage = new AccumuloPeriodicQueryResultStorage(connector, conf.getTablePrefix());
             CreatePeriodicQuery periodicQuery = new CreatePeriodicQuery(fluo, storage);
-            String id = periodicQuery.createQueryAndRegisterWithKafka(sparql, registrar);
+            String id = FluoQueryUtils.convertFluoQueryIdToPcjId(periodicQuery.createPeriodicQuery(sparql, registrar).getQueryId());
             addData(statements);
             app.start();
-//            
+           
             Multimap<Long, BindingSet> expected = HashMultimap.create();
             try (KafkaConsumer<String, BindingSet> consumer = new KafkaConsumer<>(kafkaProps, new StringDeserializer(), new BindingSetSerDe())) {
                 consumer.subscribe(Arrays.asList(id));
@@ -458,13 +452,6 @@ public class PeriodicNotificationApplicationIT extends RyaExportITBase {
     public void shutdown() {
         registrar.close();
         app.stop();
-        teardownKafka();
-    }
-    
-    private void teardownKafka() {
-        kafkaServer.shutdown();
-        zkClient.close();
-        zkServer.shutdown();
     }
     
     private void addData(Collection<Statement> statements) throws DatatypeConfigurationException {
@@ -473,20 +460,17 @@ public class PeriodicNotificationApplicationIT extends RyaExportITBase {
             InsertTriples inserter = new InsertTriples();
             statements.forEach(x -> inserter.insert(fluo, RdfToRyaConversions.convertStatement(x)));
             getMiniFluo().waitForObservers();
-//            FluoITHelper.printFluoTable(fluo);
         }
-
     }
 
-    private Properties getKafkaProperties(PeriodicNotificationApplicationConfiguration conf) { 
+    private static Properties getKafkaProperties(PeriodicNotificationApplicationConfiguration conf) { 
         Properties kafkaProps = new Properties();
-        kafkaProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, conf.getBootStrapServers());
-        kafkaProps.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, conf.getNotificationClientId());
+        kafkaProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+        kafkaProps.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, UUID.randomUUID().toString());
         kafkaProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, conf.getNotificationGroupId());
         kafkaProps.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
         return kafkaProps;
     }
-
     
     private Properties getProps() throws IOException {
         

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/application/PeriodicNotificationProviderIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/application/PeriodicNotificationProviderIT.java b/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/application/PeriodicNotificationProviderIT.java
index cf24974..e05ca6f 100644
--- a/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/application/PeriodicNotificationProviderIT.java
+++ b/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/application/PeriodicNotificationProviderIT.java
@@ -31,10 +31,11 @@ import org.apache.rya.indexing.pcj.fluo.app.util.FluoQueryUtils;
 import org.apache.rya.periodic.notification.coordinator.PeriodicNotificationCoordinatorExecutor;
 import org.apache.rya.periodic.notification.notification.TimestampedNotification;
 import org.apache.rya.periodic.notification.recovery.PeriodicNotificationProvider;
+import org.junit.Assert;
 import org.junit.Test;
 import org.openrdf.query.MalformedQueryException;
 
-import org.junit.Assert;
+import com.google.common.collect.Sets;
 
 public class PeriodicNotificationProviderIT extends AccumuloExportITBase {
 
@@ -55,7 +56,7 @@ public class PeriodicNotificationProviderIT extends AccumuloExportITBase {
         
         String id = null;
         try(FluoClient fluo = new FluoClientImpl(getFluoConfiguration())) {
-            id = pcj.createPcj(sparql, fluo).getQueryId();
+            id = pcj.createPcj(FluoQueryUtils.createNewPcjId(), sparql, Sets.newHashSet(), fluo).getQueryId();
             provider.processRegisteredNotifications(coord, fluo.newSnapshot());
         }
         

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/exporter/PeriodicNotificationExporterIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/exporter/PeriodicNotificationExporterIT.java b/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/exporter/PeriodicNotificationExporterIT.java
index c5dc809..874e7e2 100644
--- a/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/exporter/PeriodicNotificationExporterIT.java
+++ b/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/exporter/PeriodicNotificationExporterIT.java
@@ -36,6 +36,7 @@ import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage;
 import org.apache.rya.kafka.base.KafkaITBase;
 import org.apache.rya.kafka.base.KafkaTestInstanceRule;
+import org.apache.rya.periodic.notification.api.BindingSetRecord;
 import org.apache.rya.periodic.notification.serialization.BindingSetSerDe;
 import org.junit.Assert;
 import org.junit.Rule;

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/processor/PeriodicNotificationProcessorIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/processor/PeriodicNotificationProcessorIT.java b/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/processor/PeriodicNotificationProcessorIT.java
index fa60e48..21109ae 100644
--- a/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/processor/PeriodicNotificationProcessorIT.java
+++ b/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/processor/PeriodicNotificationProcessorIT.java
@@ -30,8 +30,8 @@ import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage;
 import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPeriodicQueryResultStorage;
 import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
 import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
+import org.apache.rya.periodic.notification.api.BindingSetRecord;
 import org.apache.rya.periodic.notification.api.NodeBin;
-import org.apache.rya.periodic.notification.exporter.BindingSetRecord;
 import org.apache.rya.periodic.notification.notification.PeriodicNotification;
 import org.apache.rya.periodic.notification.notification.TimestampedNotification;
 import org.junit.Assert;

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/pruner/PeriodicNotificationBinPrunerIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/pruner/PeriodicNotificationBinPrunerIT.java b/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/pruner/PeriodicNotificationBinPrunerIT.java
index bb98b7f..830fa46 100644
--- a/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/pruner/PeriodicNotificationBinPrunerIT.java
+++ b/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/pruner/PeriodicNotificationBinPrunerIT.java
@@ -39,9 +39,11 @@ import org.apache.fluo.api.data.ColumnValue;
 import org.apache.fluo.api.data.Span;
 import org.apache.fluo.core.client.FluoClientImpl;
 import org.apache.rya.api.resolver.RdfToRyaConversions;
+import org.apache.rya.indexing.pcj.fluo.api.CreatePeriodicQuery;
 import org.apache.rya.indexing.pcj.fluo.api.InsertTriples;
 import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants;
 import org.apache.rya.indexing.pcj.fluo.app.NodeType;
+import org.apache.rya.indexing.pcj.fluo.app.util.FluoQueryUtils;
 import org.apache.rya.indexing.pcj.fluo.app.util.PeriodicQueryUtil;
 import org.apache.rya.indexing.pcj.fluo.app.util.RowKeyUtil;
 import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage;
@@ -50,9 +52,7 @@ import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableItera
 import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPeriodicQueryResultStorage;
 import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
 import org.apache.rya.pcj.fluo.test.base.RyaExportITBase;
-import org.apache.rya.periodic.notification.api.CreatePeriodicQuery;
 import org.apache.rya.periodic.notification.api.NodeBin;
-import org.apache.rya.periodic.notification.notification.PeriodicNotification;
 import org.junit.Assert;
 import org.junit.Test;
 import org.openrdf.model.Statement;
@@ -85,8 +85,7 @@ public class PeriodicNotificationBinPrunerIT extends RyaExportITBase {
         PeriodicQueryResultStorage periodicStorage = new AccumuloPeriodicQueryResultStorage(super.getAccumuloConnector(),
                 getRyaInstanceName());
         CreatePeriodicQuery createPeriodicQuery = new CreatePeriodicQuery(fluo, periodicStorage);
-        PeriodicNotification notification = createPeriodicQuery.createPeriodicQuery(sparql);
-        String queryId = notification.getId();
+        String queryId = FluoQueryUtils.convertFluoQueryIdToPcjId(createPeriodicQuery.createPeriodicQuery(sparql).getQueryId());
 
         // create statements to ingest into Fluo
         final ValueFactory vf = new ValueFactoryImpl();

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/registration/kafka/PeriodicCommandNotificationConsumerIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/registration/kafka/PeriodicCommandNotificationConsumerIT.java b/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/registration/kafka/PeriodicCommandNotificationConsumerIT.java
index bde406f..522e69d 100644
--- a/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/registration/kafka/PeriodicCommandNotificationConsumerIT.java
+++ b/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/registration/kafka/PeriodicCommandNotificationConsumerIT.java
@@ -18,31 +18,44 @@
  */package org.apache.rya.periodic.notification.registration.kafka;
 
 import java.util.Properties;
+import java.util.UUID;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.log4j.BasicConfigurator;
-import org.apache.rya.pcj.fluo.test.base.KafkaExportITBase;
+import org.apache.rya.kafka.base.KafkaITBase;
+import org.apache.rya.kafka.base.KafkaTestInstanceRule;
 import org.apache.rya.periodic.notification.coordinator.PeriodicNotificationCoordinatorExecutor;
 import org.apache.rya.periodic.notification.notification.CommandNotification;
 import org.apache.rya.periodic.notification.notification.TimestampedNotification;
+import org.apache.rya.periodic.notification.registration.KafkaNotificationRegistrationClient;
 import org.apache.rya.periodic.notification.serialization.CommandNotificationSerializer;
-import org.junit.After;
 import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
 
-public class PeriodicCommandNotificationConsumerIT extends KafkaExportITBase {
+public class PeriodicCommandNotificationConsumerIT extends KafkaITBase {
 
-    private static final String topic = "topic";
     private KafkaNotificationRegistrationClient registration;
     private PeriodicNotificationCoordinatorExecutor coord;
     private KafkaNotificationProvider provider;
+    private String bootstrapServer;
+    
+    @Rule
+    public KafkaTestInstanceRule rule = new KafkaTestInstanceRule(false);
+    
+    @Before
+    public void init() throws Exception {
+        bootstrapServer = createBootstrapServerConfig().getProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG);
+    }
 
     @Test
     public void kafkaNotificationProviderTest() throws InterruptedException {
@@ -52,6 +65,9 @@ public class PeriodicCommandNotificationConsumerIT extends KafkaExportITBase {
         BlockingQueue<TimestampedNotification> notifications = new LinkedBlockingQueue<>();
         Properties props = createKafkaConfig();
         KafkaProducer<String, CommandNotification> producer = new KafkaProducer<>(props);
+        String topic = rule.getKafkaTopicName();
+        rule.createTopic(topic);
+        
         registration = new KafkaNotificationRegistrationClient(topic, producer);
         coord = new PeriodicNotificationCoordinatorExecutor(1, notifications);
         provider = new KafkaNotificationProvider(topic, new StringDeserializer(), new CommandNotificationSerializer(), props, coord, 1);
@@ -80,6 +96,9 @@ public class PeriodicCommandNotificationConsumerIT extends KafkaExportITBase {
         BlockingQueue<TimestampedNotification> notifications = new LinkedBlockingQueue<>();
         Properties props = createKafkaConfig();
         KafkaProducer<String, CommandNotification> producer = new KafkaProducer<>(props);
+        String topic = rule.getKafkaTopicName();
+        rule.createTopic(topic);
+        
         registration = new KafkaNotificationRegistrationClient(topic, producer);
         coord = new PeriodicNotificationCoordinatorExecutor(1, notifications);
         provider = new KafkaNotificationProvider(topic, new StringDeserializer(), new CommandNotificationSerializer(), props, coord, 1);
@@ -108,8 +127,8 @@ public class PeriodicCommandNotificationConsumerIT extends KafkaExportITBase {
 
     private Properties createKafkaConfig() {
         Properties props = new Properties();
-        props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
-        props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group0");
+        props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
+        props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
         props.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "consumer0");
         props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
         props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.periodic.service/periodic.service.notification/pom.xml
----------------------------------------------------------------------
diff --git a/extras/rya.periodic.service/periodic.service.notification/pom.xml b/extras/rya.periodic.service/periodic.service.notification/pom.xml
index 2173888..1e59e15 100644
--- a/extras/rya.periodic.service/periodic.service.notification/pom.xml
+++ b/extras/rya.periodic.service/periodic.service.notification/pom.xml
@@ -1,107 +1,112 @@
 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-	<modelVersion>4.0.0</modelVersion>
-	<!-- Licensed to the Apache Software Foundation (ASF) under one or more 
-		contributor license agreements. See the NOTICE file distributed with this 
-		work for additional information regarding copyright ownership. The ASF licenses 
-		this file to you under the Apache License, Version 2.0 (the "License"); you 
-		may not use this file except in compliance with the License. You may obtain 
-		a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless 
-		required by applicable law or agreed to in writing, software distributed 
-		under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES 
-		OR CONDITIONS OF ANY KIND, either express or implied. See the License for 
-		the specific language governing permissions and limitations under the License. -->
-	<parent>
-		<groupId>org.apache.rya</groupId>
-		<artifactId>rya.periodic.service</artifactId>
-		<version>3.2.11-incubating-SNAPSHOT</version>
-	</parent>
+    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <!-- Licensed to the Apache Software Foundation (ASF) under one or more 
+        contributor license agreements. See the NOTICE file distributed with this 
+        work for additional information regarding copyright ownership. The ASF licenses 
+        this file to you under the Apache License, Version 2.0 (the "License"); you 
+        may not use this file except in compliance with the License. You may obtain 
+        a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless 
+        required by applicable law or agreed to in writing, software distributed 
+        under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES 
+        OR CONDITIONS OF ANY KIND, either express or implied. See the License for 
+        the specific language governing permissions and limitations under the License. -->
+    <parent>
+        <groupId>org.apache.rya</groupId>
+        <artifactId>rya.periodic.service</artifactId>
+        <version>3.2.11-incubating-SNAPSHOT</version>
+    </parent>
 
-	<artifactId>rya.periodic.service.notification</artifactId>
-	
-	<name>Apache Rya Periodic Service Notification</name>
+    <artifactId>rya.periodic.service.notification</artifactId>
+
+    <name>Apache Rya Periodic Service Notification</name>
     <description>Notifications for Rya Periodic Service</description>
 
-	<dependencies>
+    <dependencies>
+
+        <dependency>
+            <groupId>org.apache.twill</groupId>
+            <artifactId>twill-api</artifactId>
+            <version>0.11.0</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.twill</groupId>
+            <artifactId>twill-yarn</artifactId>
+            <version>0.11.0</version>
+            <exclusions>
+                <exclusion>
+                    <artifactId>kafka_2.10</artifactId>
+                    <groupId>org.apache.kafka</groupId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>com.google.code.gson</groupId>
+            <artifactId>gson</artifactId>
+            <version>2.8.0</version>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.fluo</groupId>
+            <artifactId>fluo-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.fluo</groupId>
+            <artifactId>fluo-core</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.rya</groupId>
+            <artifactId>rya.indexing</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.openrdf.sesame</groupId>
+            <artifactId>sesame-query</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.rya</groupId>
+            <artifactId>rya.indexing.pcj</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.rya</groupId>
+            <artifactId>rya.pcj.fluo.app</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.rya</groupId>
+            <artifactId>rya.periodic.service.api</artifactId>
+        </dependency>
 
-		<dependency>
-			<groupId>org.apache.twill</groupId>
-			<artifactId>twill-api</artifactId>
-			<version>0.11.0</version>
-		</dependency>
-		<dependency>
-			<groupId>org.apache.twill</groupId>
-			<artifactId>twill-yarn</artifactId>
-			<version>0.11.0</version>
-			<exclusions>
-				<exclusion>
-					<artifactId>kafka_2.10</artifactId>
-					<groupId>org.apache.kafka</groupId>
-				</exclusion>
-			</exclusions>
-		</dependency>
-		<dependency>
-			<groupId>com.google.code.gson</groupId>
-			<artifactId>gson</artifactId>
-			<version>2.8.0</version>
-			<scope>compile</scope>
-		</dependency>
-		<dependency>
-			<groupId>junit</groupId>
-			<artifactId>junit</artifactId>
-		</dependency>
-		<dependency>
-			<groupId>org.apache.fluo</groupId>
-			<artifactId>fluo-api</artifactId>
-		</dependency>
-		<dependency>
-			<groupId>org.apache.fluo</groupId>
-			<artifactId>fluo-core</artifactId>
-		</dependency>
-		<dependency>
-			<groupId>org.apache.rya</groupId>
-			<artifactId>rya.indexing</artifactId>
-		</dependency>
-		<dependency>
-			<groupId>org.openrdf.sesame</groupId>
-			<artifactId>sesame-query</artifactId>
-		</dependency>
-		<dependency>
-			<groupId>org.apache.rya</groupId>
-			<artifactId>rya.indexing.pcj</artifactId>
-		</dependency>
-		<dependency>
-			<groupId>org.apache.rya</groupId>
-			<artifactId>rya.pcj.fluo.app</artifactId>
-		</dependency>
-	</dependencies>
+    </dependencies>
 
-	<build>
-		<plugins>
-			<plugin>
-				<groupId>org.apache.maven.plugins</groupId>
-				<artifactId>maven-compiler-plugin</artifactId>
-				<configuration>
-					<encoding>UTF-8</encoding>
-					<source>1.8</source>
-					<target>1.8</target>
-				</configuration>
-			</plugin>
-			<plugin>
-				<groupId>org.apache.maven.plugins</groupId>
-				<artifactId>maven-shade-plugin</artifactId>
-				<version>3.0.0</version>
-				<executions>
-					<execution>
-						<phase>package</phase>
-						<goals>
-							<goal>shade</goal>
-						</goals>
-					</execution>
-				</executions>
-			</plugin>
-		</plugins>
-	</build>
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-compiler-plugin</artifactId>
+                <configuration>
+                    <encoding>UTF-8</encoding>
+                    <source>1.8</source>
+                    <target>1.8</target>
+                </configuration>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-shade-plugin</artifactId>
+                <version>3.0.0</version>
+                <executions>
+                    <execution>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>shade</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
 
 
 </project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/BinPruner.java
----------------------------------------------------------------------
diff --git a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/BinPruner.java b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/BinPruner.java
deleted file mode 100644
index 571ee1c..0000000
--- a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/BinPruner.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.rya.periodic.notification.api;
-
-import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants;
-import org.openrdf.query.Binding;
-
-/**
- * Object that cleans up old {@link BindingSet}s corresponding to the specified
- * {@link NodeBin}. This class deletes all BindingSets with the bin 
- * indicated by {@link NodeBin#getBin()}.  A BindingSet corresponds to a given
- * bin if it contains a {@link Binding} with name {@link IncrementalUpdateConstants#PERIODIC_BIN_ID}
- * and value equal to the given bin.
- *
- */
-public interface BinPruner {
-    
-    /**
-     * Cleans up all {@link BindingSet}s associated with the indicated {@link NodeBin}.
-     * @param bin - NodeBin that indicates which BindingSets to delete..
-     */
-    public void pruneBindingSetBin(NodeBin bin);
-    
-}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/BindingSetExporter.java
----------------------------------------------------------------------
diff --git a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/BindingSetExporter.java b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/BindingSetExporter.java
deleted file mode 100644
index 500a435..0000000
--- a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/BindingSetExporter.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.rya.periodic.notification.api;
-
-import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporter.ResultExportException;
-import org.apache.rya.periodic.notification.exporter.BindingSetRecord;
-
-/**
- * An Object that is used to export {@link BindingSet}s to an external repository or queuing system.
- *
- */
-public interface BindingSetExporter {
-
-    /**
-     * This method exports the BindingSet to the external repository or queuing system
-     * that this BindingSetExporter is configured to export to.
-     * @param bindingSet - {@link BindingSet} to be exported
-     * @throws ResultExportException
-     */
-    public void exportNotification(BindingSetRecord bindingSet) throws ResultExportException;
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/CreatePeriodicQuery.java
----------------------------------------------------------------------
diff --git a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/CreatePeriodicQuery.java b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/CreatePeriodicQuery.java
deleted file mode 100644
index 60a3e7c..0000000
--- a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/CreatePeriodicQuery.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.rya.periodic.notification.api;
-
-import java.util.Optional;
-
-import org.apache.fluo.api.client.FluoClient;
-import org.apache.rya.api.client.CreatePCJ.ExportStrategy;
-import org.apache.rya.indexing.pcj.fluo.api.CreateFluoPcj;
-import org.apache.rya.indexing.pcj.fluo.app.query.PeriodicQueryNode;
-import org.apache.rya.indexing.pcj.fluo.app.query.UnsupportedQueryException;
-import org.apache.rya.indexing.pcj.fluo.app.util.FluoQueryUtils;
-import org.apache.rya.indexing.pcj.fluo.app.util.PeriodicQueryUtil;
-import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage;
-import org.apache.rya.indexing.pcj.storage.PeriodicQueryStorageException;
-import org.apache.rya.periodic.notification.application.PeriodicNotificationApplication;
-import org.apache.rya.periodic.notification.notification.PeriodicNotification;
-import org.openrdf.query.MalformedQueryException;
-import org.openrdf.query.algebra.evaluation.function.Function;
-
-import com.google.common.collect.Sets;
-
-/**
- * Object that creates a Periodic Query.  A Periodic Query is any query
- * requesting periodic updates about events that occurred within a given
- * window of time of this instant. This is also known as a rolling window
- * query.  Period Queries can be expressed using SPARQL by including the
- * {@link Function} indicated by the URI {@link PeriodicQueryUtil#PeriodicQueryURI}
- * in the query.  The user must provide this Function with the following arguments:
- * the temporal variable in the query that will be filtered on, the window of time
- * that events must occur within, the period at which the user wants to receive updates,
- * and the time unit.  The following query requests all observations that occurred
- * within the last minute and requests updates every 15 seconds.  It also performs
- * a count on those observations.
- * <li>
- * <li> prefix function: http://org.apache.rya/function#
- * <li>               "prefix time: http://www.w3.org/2006/time# 
- * <li>               "select (count(?obs) as ?total) where {
- * <li>               "Filter(function:periodic(?time, 1, .25, time:minutes))
- * <li>               "?obs uri:hasTime ?time.
- * <li>               "?obs uri:hasId ?id }
- * <li>
- * 
- * This class is responsible for taking a Periodic Query expressed as a SPARQL query
- * and adding to Fluo and Kafka so that it can be processed by the {@link PeriodicNotificationApplication}.
- */
-public class CreatePeriodicQuery {
-
-    private FluoClient fluoClient;
-    private PeriodicQueryResultStorage periodicStorage;
-    Function funciton;
-    PeriodicQueryUtil util;
-    
-    
-    public CreatePeriodicQuery(FluoClient fluoClient, PeriodicQueryResultStorage periodicStorage) {
-        this.fluoClient = fluoClient;
-        this.periodicStorage = periodicStorage;
-    }
-    
-    /**
-     * Creates a Periodic Query by adding the query to Fluo and using the resulting
-     * Fluo id to create a {@link PeriodicQueryResultStorage} table.
-     * @param sparql - sparql query registered to Fluo whose results are stored in PeriodicQueryResultStorage table
-     * @return PeriodicNotification that can be used to register register this query with the {@link PeriodicNotificationApplication}.
-     */
-    public PeriodicNotification createPeriodicQuery(String sparql) {
-        try {
-            Optional<PeriodicQueryNode> optNode = PeriodicQueryUtil.getPeriodicNode(sparql);
-            if(optNode.isPresent()) {
-                PeriodicQueryNode periodicNode = optNode.get();
-                String pcjId = FluoQueryUtils.createNewPcjId();
-               
-                //register query with Fluo
-                CreateFluoPcj createPcj = new CreateFluoPcj();
-                createPcj.createPcj(pcjId, sparql, Sets.newHashSet(ExportStrategy.RYA), fluoClient);
-                
-                //register query with PeriodicResultStorage table
-                periodicStorage.createPeriodicQuery(pcjId, sparql);
-                //create notification
-                PeriodicNotification notification = PeriodicNotification.builder().id(pcjId).period(periodicNode.getPeriod())
-                        .timeUnit(periodicNode.getUnit()).build();
-                return notification;
-            } else {
-                throw new RuntimeException("Invalid PeriodicQuery.  Query must possess a PeriodicQuery Filter.");
-            }
-        } catch (MalformedQueryException | PeriodicQueryStorageException | UnsupportedQueryException e) {
-            throw new RuntimeException(e);
-        }
-    }
-    
-    /**
-     * Creates a Periodic Query by adding the query to Fluo and using the resulting
-     * Fluo id to create a {@link PeriodicQueryResultStorage} table.  In addition, this
-     * method registers the PeriodicQuery with the PeriodicNotificationApplication to poll
-     * the PeriodicQueryResultStorage table at regular intervals and export results to Kafka.
-     * The PeriodicNotificationApp queries the result table at a regular interval indicated by the Period of
-     * the PeriodicQuery.
-     * @param sparql - sparql query registered to Fluo whose results are stored in PeriodicQueryResultStorage table
-     * @param PeriodicNotificationClient - registers the PeriodicQuery with the {@link PeriodicNotificationApplication}
-     * @return id of the PeriodicQuery and PeriodicQueryResultStorage table (these are the same)
-     */
-    public String createQueryAndRegisterWithKafka(String sparql, PeriodicNotificationClient periodicClient) {
-        PeriodicNotification notification = createPeriodicQuery(sparql);
-        periodicClient.addNotification(notification);
-        return notification.getId();
-    }
-    
-}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/LifeCycle.java
----------------------------------------------------------------------
diff --git a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/LifeCycle.java b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/LifeCycle.java
deleted file mode 100644
index b1e8bad..0000000
--- a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/LifeCycle.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.rya.periodic.notification.api;
-
-/**
- * Interface providing basic life cycle functionality,
- * including stopping and starting any class implementing this
- * interface and checking whether is it running.
- *
- */
-public interface LifeCycle {
-
-    /**
-     * Starts a running application.
-     */
-    public void start();
-
-    /**
-     * Stops a running application.
-     */
-    public void stop();
-    
-    /**
-     * Determine if application is currently running.
-     * @return true if application is running and false otherwise.
-     */
-    public boolean currentlyRunning();
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/NodeBin.java
----------------------------------------------------------------------
diff --git a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/NodeBin.java b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/NodeBin.java
deleted file mode 100644
index 3ed7979..0000000
--- a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/NodeBin.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.rya.periodic.notification.api;
-
-import java.util.Objects;
-
-/**
- * Object used to indicate the id of a given Periodic Query
- * along with a particular bin of results.  This Object is used
- * by the {@link BinPruner} to clean up old query results after
- * they have been processed.
- *
- */
-public class NodeBin {
-
-    private long bin;
-    private String nodeId;
-
-    public NodeBin(String nodeId, long bin) {
-        this.bin = bin;
-        this.nodeId = nodeId;
-    }
-
-    /**
-     * @return id of Periodic Query
-     */
-    public String getNodeId() {
-        return nodeId;
-    }
-/**
- * @return bin id of results for a given Periodic Query 
- */
-    public long getBin() {
-        return bin;
-    }
-
-    @Override
-    public boolean equals(Object other) {
-        if (this == other) {
-            return true;
-        }
-
-        if (other instanceof NodeBin) {
-            NodeBin bin = (NodeBin) other;
-            return this.bin == bin.bin && this.nodeId.equals(bin.nodeId);
-        }
-
-        return false;
-    }
-
-    @Override
-    public int hashCode() {
-        return Objects.hash(bin, nodeId);
-    }
-
-    @Override
-    public String toString() {
-        return new StringBuilder().append("Node Bin \n").append("   QueryId: " + nodeId + "\n").append("   Bin: " + bin + "\n").toString();
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/Notification.java
----------------------------------------------------------------------
diff --git a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/Notification.java b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/Notification.java
deleted file mode 100644
index 3e9e0d1..0000000
--- a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/Notification.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.rya.periodic.notification.api;
-
-/**
- * Notification Object used by the Periodic Query Service
- * to inform workers to process results for a given Periodic
- * Query with the indicated id.
- *
- */
-public interface Notification {
-
-    /**
-     * @return id of a Periodic Query
-     */
-    public String getId();
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/NotificationCoordinatorExecutor.java
----------------------------------------------------------------------
diff --git a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/NotificationCoordinatorExecutor.java b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/NotificationCoordinatorExecutor.java
deleted file mode 100644
index d53dc17..0000000
--- a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/NotificationCoordinatorExecutor.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.rya.periodic.notification.api;
-
-import java.util.concurrent.ScheduledExecutorService;
-
-import org.apache.rya.periodic.notification.notification.CommandNotification;
-
-/**
- * Object that manages the periodic notifications for the Periodic Query Service.
- * This Object processes new requests for periodic updates by registering them with
- * some sort of service that generates periodic updates (such as a {@link ScheduledExecutorService}).
- *
- */
-public interface NotificationCoordinatorExecutor extends LifeCycle {
-
-    /**
-     * Registers or deletes a {@link CommandNotification}s with the periodic service to
-     * generate notifications at a regular interval indicated by the CommandNotification.
-     * @param notification - CommandNotification to be registered or deleted from the periodic update
-     * service.
-     */
-    public void processNextCommandNotification(CommandNotification notification);
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/NotificationProcessor.java
----------------------------------------------------------------------
diff --git a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/NotificationProcessor.java b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/NotificationProcessor.java
deleted file mode 100644
index 4ac9089..0000000
--- a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/NotificationProcessor.java
+++ /dev/null
@@ -1,41 +0,0 @@
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.rya.periodic.notification.api;
-
-import org.apache.rya.periodic.notification.notification.TimestampedNotification;
-
-/**
- * Object that processes new {@link TimestampedNotification}s generated by {@link NotificationCoordinatorExecutor}.
- * It is expected that the NotificationCoordinatorExecutor will this Object with notifications to perform work via some sort 
- * sort of queuing service such as a BlockingQueue or Kafka.  This Object processes the notifications by retrieving
- * query results associated with the Periodic Query id given by {@link TimestampedNotification#getId()}, parsing them
- * and then providing them to another service to be exported.
- *
- */
-public interface NotificationProcessor {
-
-    /**
-     * Processes {@link TimestampedNotification}s by retrieving the Periodic Query results
-     * associated the query id given by {@link TimestampedNotification#getId()}.
-     * @param notification - contains information about which query results to retrieve
-     */
-    public void processNotification(TimestampedNotification notification);
-    
-}