You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rya.apache.org by ca...@apache.org on 2017/09/04 13:14:06 UTC
[2/4] incubator-rya git commit: RYA-319-Integration of Periodic Query
with CLI. Closes #220.
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/registration/KafkaNotificationRegistrationClient.java
----------------------------------------------------------------------
diff --git a/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/registration/KafkaNotificationRegistrationClient.java b/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/registration/KafkaNotificationRegistrationClient.java
new file mode 100644
index 0000000..bb438be
--- /dev/null
+++ b/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/registration/KafkaNotificationRegistrationClient.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.periodic.notification.registration;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.rya.periodic.notification.api.Notification;
+import org.apache.rya.periodic.notification.api.PeriodicNotificationClient;
+import org.apache.rya.periodic.notification.notification.BasicNotification;
+import org.apache.rya.periodic.notification.notification.CommandNotification;
+import org.apache.rya.periodic.notification.notification.CommandNotification.Command;
+import org.apache.rya.periodic.notification.notification.PeriodicNotification;
+
+/**
+ * Implementation of {@link PeriodicNotificaitonClient} used to register new notification
+ * requests with the PeriodicQueryService.
+ *
+ */
+public class KafkaNotificationRegistrationClient implements PeriodicNotificationClient {
+
+ private KafkaProducer<String, CommandNotification> producer;
+ private String topic;
+
+ public KafkaNotificationRegistrationClient(String topic, KafkaProducer<String, CommandNotification> producer) {
+ this.topic = topic;
+ this.producer = producer;
+ }
+
+ @Override
+ public void addNotification(PeriodicNotification notification) {
+ processNotification(new CommandNotification(Command.ADD, notification));
+
+ }
+
+ @Override
+ public void deleteNotification(BasicNotification notification) {
+ processNotification(new CommandNotification(Command.DELETE, notification));
+ }
+
+ @Override
+ public void deleteNotification(String notificationId) {
+ processNotification(new CommandNotification(Command.DELETE, new BasicNotification(notificationId)));
+ }
+
+ @Override
+ public void addNotification(String id, long period, long delay, TimeUnit unit) {
+ Notification notification = PeriodicNotification.builder().id(id).period(period).initialDelay(delay).timeUnit(unit).build();
+ processNotification(new CommandNotification(Command.ADD, notification));
+ }
+
+
+ private void processNotification(CommandNotification notification) {
+ producer.send(new ProducerRecord<String, CommandNotification>(topic, notification.getId(), notification));
+ }
+
+ @Override
+ public void close() {
+ producer.close();
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/serialization/BasicNotificationTypeAdapter.java
----------------------------------------------------------------------
diff --git a/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/serialization/BasicNotificationTypeAdapter.java b/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/serialization/BasicNotificationTypeAdapter.java
new file mode 100644
index 0000000..bd29d29
--- /dev/null
+++ b/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/serialization/BasicNotificationTypeAdapter.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.periodic.notification.serialization;
+
+import java.lang.reflect.Type;
+
+import org.apache.rya.periodic.notification.notification.BasicNotification;
+
+import com.google.gson.JsonDeserializationContext;
+import com.google.gson.JsonDeserializer;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParseException;
+import com.google.gson.JsonPrimitive;
+import com.google.gson.JsonSerializationContext;
+import com.google.gson.JsonSerializer;
+
+/**
+ * {@link TypeAdapter} for {@link BasicNotification}s. Used in {@link CommandNotificationTypeAdapter} to
+ * serialize {@link CommandNotification}s.
+ *
+ */
+public class BasicNotificationTypeAdapter implements JsonDeserializer<BasicNotification>, JsonSerializer<BasicNotification> {
+
+ @Override
+ public JsonElement serialize(BasicNotification arg0, Type arg1, JsonSerializationContext arg2) {
+ JsonObject result = new JsonObject();
+ result.add("id", new JsonPrimitive(arg0.getId()));
+ return result;
+ }
+
+ @Override
+ public BasicNotification deserialize(JsonElement arg0, Type arg1, JsonDeserializationContext arg2) throws JsonParseException {
+ JsonObject json = arg0.getAsJsonObject();
+ String id = json.get("id").getAsString();
+ return new BasicNotification(id);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/serialization/BindingSetSerDe.java
----------------------------------------------------------------------
diff --git a/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/serialization/BindingSetSerDe.java b/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/serialization/BindingSetSerDe.java
new file mode 100644
index 0000000..50180ad
--- /dev/null
+++ b/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/serialization/BindingSetSerDe.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.periodic.notification.serialization;
+
+import java.io.UnsupportedEncodingException;
+import java.util.Arrays;
+import java.util.Map;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.log4j.Logger;
+import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjSerializer;
+import org.apache.rya.indexing.pcj.storage.accumulo.BindingSetConverter.BindingSetConversionException;
+import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
+import org.openrdf.query.BindingSet;
+import org.openrdf.query.algebra.evaluation.QueryBindingSet;
+
+import com.google.common.base.Joiner;
+import com.google.common.primitives.Bytes;
+
+/**
+ * Kafka {@link Serializer} and {@link Deserializer} for producing and consuming messages
+ * from Kafka.
+ *
+ */
+public class BindingSetSerDe implements Serializer<BindingSet>, Deserializer<BindingSet> {
+
+ private static final Logger log = Logger.getLogger(BindingSetSerDe.class);
+ private static final AccumuloPcjSerializer serializer = new AccumuloPcjSerializer();
+ private static final byte[] DELIM_BYTE = "\u0002".getBytes();
+
+ private byte[] toBytes(BindingSet bindingSet) {
+ try {
+ return getBytes(getVarOrder(bindingSet), bindingSet);
+ } catch(Exception e) {
+ log.trace("Unable to serialize BindingSet: " + bindingSet);
+ return new byte[0];
+ }
+ }
+
+ private BindingSet fromBytes(byte[] bsBytes) {
+ try{
+ int firstIndex = Bytes.indexOf(bsBytes, DELIM_BYTE);
+ byte[] varOrderBytes = Arrays.copyOf(bsBytes, firstIndex);
+ byte[] bsBytesNoVarOrder = Arrays.copyOfRange(bsBytes, firstIndex + 1, bsBytes.length);
+ VariableOrder varOrder = new VariableOrder(new String(varOrderBytes,"UTF-8").split(";"));
+ return getBindingSet(varOrder, bsBytesNoVarOrder);
+ } catch(Exception e) {
+ log.trace("Unable to deserialize BindingSet: " + bsBytes);
+ return new QueryBindingSet();
+ }
+ }
+
+ private VariableOrder getVarOrder(BindingSet bs) {
+ return new VariableOrder(bs.getBindingNames());
+ }
+
+ private byte[] getBytes(VariableOrder varOrder, BindingSet bs) throws UnsupportedEncodingException, BindingSetConversionException {
+ byte[] bsBytes = serializer.convert(bs, varOrder);
+ String varOrderString = Joiner.on(";").join(varOrder.getVariableOrders());
+ byte[] varOrderBytes = varOrderString.getBytes("UTF-8");
+ return Bytes.concat(varOrderBytes, DELIM_BYTE, bsBytes);
+ }
+
+ private BindingSet getBindingSet(VariableOrder varOrder, byte[] bsBytes) throws BindingSetConversionException {
+ return serializer.convert(bsBytes, varOrder);
+ }
+
+ @Override
+ public BindingSet deserialize(String topic, byte[] bytes) {
+ return fromBytes(bytes);
+ }
+
+ @Override
+ public void close() {
+ // Do nothing. Nothing to close.
+ }
+
+ @Override
+ public void configure(Map<String, ?> arg0, boolean arg1) {
+ // Do nothing. Nothing to configure.
+ }
+
+ @Override
+ public byte[] serialize(String topic, BindingSet bs) {
+ return toBytes(bs);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/serialization/CommandNotificationSerializer.java
----------------------------------------------------------------------
diff --git a/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/serialization/CommandNotificationSerializer.java b/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/serialization/CommandNotificationSerializer.java
new file mode 100644
index 0000000..302e1be
--- /dev/null
+++ b/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/serialization/CommandNotificationSerializer.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.periodic.notification.serialization;
+
+import java.io.UnsupportedEncodingException;
+import java.util.Map;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.rya.periodic.notification.api.Notification;
+import org.apache.rya.periodic.notification.notification.CommandNotification;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+
+/**
+ * Kafka {@link Serializer} and {@link Deserializer} for producing and consuming {@link CommandNotification}s
+ * to and from Kafka.
+ *
+ */
+public class CommandNotificationSerializer implements Serializer<CommandNotification>, Deserializer<CommandNotification> {
+
+ private static Gson gson = new GsonBuilder()
+ .registerTypeHierarchyAdapter(Notification.class, new CommandNotificationTypeAdapter()).create();
+ private static final Logger LOG = LoggerFactory.getLogger(CommandNotificationSerializer.class);
+
+ @Override
+ public CommandNotification deserialize(String topic, byte[] bytes) {
+ String json = null;
+ try {
+ json = new String(bytes, "UTF-8");
+ } catch (UnsupportedEncodingException e) {
+ LOG.info("Unable to deserialize notification for topic: " + topic);
+ }
+ return gson.fromJson(json, CommandNotification.class);
+ }
+
+ @Override
+ public byte[] serialize(String topic, CommandNotification command) {
+ try {
+ return gson.toJson(command).getBytes("UTF-8");
+ } catch (UnsupportedEncodingException e) {
+ LOG.info("Unable to serialize notification: " + command + "for topic: " + topic);
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void close() {
+ // Do nothing. Nothing to close
+ }
+
+ @Override
+ public void configure(Map<String, ?> arg0, boolean arg1) {
+ // Do nothing. Nothing to configure
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/serialization/CommandNotificationTypeAdapter.java
----------------------------------------------------------------------
diff --git a/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/serialization/CommandNotificationTypeAdapter.java b/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/serialization/CommandNotificationTypeAdapter.java
new file mode 100644
index 0000000..a9fb7e1
--- /dev/null
+++ b/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/serialization/CommandNotificationTypeAdapter.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.periodic.notification.serialization;
+
+import java.lang.reflect.Type;
+
+import org.apache.rya.periodic.notification.api.Notification;
+import org.apache.rya.periodic.notification.notification.BasicNotification;
+import org.apache.rya.periodic.notification.notification.CommandNotification;
+import org.apache.rya.periodic.notification.notification.PeriodicNotification;
+import org.apache.rya.periodic.notification.notification.CommandNotification.Command;
+
+import com.google.gson.JsonDeserializationContext;
+import com.google.gson.JsonDeserializer;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParseException;
+import com.google.gson.JsonPrimitive;
+import com.google.gson.JsonSerializationContext;
+import com.google.gson.JsonSerializer;
+
+/**
+ * {@link TypeAdapter} used to serialize and deserialize {@link CommandNotification}s.
+ * This TypeAdapter is used in {@link CommandNotificationSerializer} for producing and
+ * consuming messages to and from Kafka.
+ *
+ */
+public class CommandNotificationTypeAdapter
+ implements JsonDeserializer<CommandNotification>, JsonSerializer<CommandNotification> {
+
+ @Override
+ public JsonElement serialize(CommandNotification arg0, Type arg1, JsonSerializationContext arg2) {
+ JsonObject result = new JsonObject();
+ result.add("command", new JsonPrimitive(arg0.getCommand().name()));
+ Notification notification = arg0.getNotification();
+ if (notification instanceof PeriodicNotification) {
+ result.add("type", new JsonPrimitive(PeriodicNotification.class.getSimpleName()));
+ PeriodicNotificationTypeAdapter adapter = new PeriodicNotificationTypeAdapter();
+ result.add("notification",
+ adapter.serialize((PeriodicNotification) notification, PeriodicNotification.class, arg2));
+ } else if (notification instanceof BasicNotification) {
+ result.add("type", new JsonPrimitive(BasicNotification.class.getSimpleName()));
+ BasicNotificationTypeAdapter adapter = new BasicNotificationTypeAdapter();
+ result.add("notification",
+ adapter.serialize((BasicNotification) notification, BasicNotification.class, arg2));
+ } else {
+ throw new IllegalArgumentException("Invalid notification type.");
+ }
+ return result;
+ }
+
+ @Override
+ public CommandNotification deserialize(JsonElement arg0, Type arg1, JsonDeserializationContext arg2)
+ throws JsonParseException {
+
+ JsonObject json = arg0.getAsJsonObject();
+ Command command = Command.valueOf(json.get("command").getAsString());
+ String type = json.get("type").getAsString();
+ Notification notification = null;
+ if (type.equals(PeriodicNotification.class.getSimpleName())) {
+ notification = (new PeriodicNotificationTypeAdapter()).deserialize(json.get("notification"),
+ PeriodicNotification.class, arg2);
+ } else if (type.equals(BasicNotification.class.getSimpleName())) {
+ notification = (new BasicNotificationTypeAdapter()).deserialize(json.get("notification"),
+ BasicNotification.class, arg2);
+ } else {
+ throw new JsonParseException("Cannot deserialize Json");
+ }
+
+ return new CommandNotification(command, notification);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/serialization/PeriodicNotificationTypeAdapter.java
----------------------------------------------------------------------
diff --git a/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/serialization/PeriodicNotificationTypeAdapter.java b/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/serialization/PeriodicNotificationTypeAdapter.java
new file mode 100644
index 0000000..fcc0ba2
--- /dev/null
+++ b/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/serialization/PeriodicNotificationTypeAdapter.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.periodic.notification.serialization;
+
+import java.lang.reflect.Type;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.rya.periodic.notification.notification.PeriodicNotification;
+import org.apache.rya.periodic.notification.notification.PeriodicNotification.Builder;
+
+import com.google.gson.JsonDeserializationContext;
+import com.google.gson.JsonDeserializer;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParseException;
+import com.google.gson.JsonPrimitive;
+import com.google.gson.JsonSerializationContext;
+import com.google.gson.JsonSerializer;
+
+/**
+ * {@link TypeAdapter} used to serialize and deserialize {@link PeriodicNotification}s.
+ * This TypeAdapter is used in {@link CommandNotificationTypeAdapter} which is used in
+ * {@link CommandNotificationSerializer} for producing and consuming messages to and from
+ * Kafka.
+ *
+ */
+public class PeriodicNotificationTypeAdapter
+ implements JsonSerializer<PeriodicNotification>, JsonDeserializer<PeriodicNotification> {
+
+ @Override
+ public PeriodicNotification deserialize(JsonElement arg0, Type arg1, JsonDeserializationContext arg2)
+ throws JsonParseException {
+
+ JsonObject json = arg0.getAsJsonObject();
+ String id = json.get("id").getAsString();
+ long period = json.get("period").getAsLong();
+ TimeUnit periodTimeUnit = TimeUnit.valueOf(json.get("timeUnit").getAsString());
+ long initialDelay = json.get("initialDelay").getAsLong();
+ Builder builder = PeriodicNotification.builder().id(id).period(period)
+ .initialDelay(initialDelay).timeUnit(periodTimeUnit);
+
+ return builder.build();
+ }
+
+ @Override
+ public JsonElement serialize(PeriodicNotification arg0, Type arg1, JsonSerializationContext arg2) {
+
+ JsonObject result = new JsonObject();
+ result.add("id", new JsonPrimitive(arg0.getId()));
+ result.add("period", new JsonPrimitive(arg0.getPeriod()));
+ result.add("initialDelay", new JsonPrimitive(arg0.getInitialDelay()));
+ result.add("timeUnit", new JsonPrimitive(arg0.getTimeUnit().name()));
+
+ return result;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.periodic.service/periodic.service.integration.tests/pom.xml
----------------------------------------------------------------------
diff --git a/extras/rya.periodic.service/periodic.service.integration.tests/pom.xml b/extras/rya.periodic.service/periodic.service.integration.tests/pom.xml
index 20a0647..402f81d 100644
--- a/extras/rya.periodic.service/periodic.service.integration.tests/pom.xml
+++ b/extras/rya.periodic.service/periodic.service.integration.tests/pom.xml
@@ -1,22 +1,14 @@
<?xml version="1.0" encoding="utf-8"?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
--->
+<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor
+ license agreements. See the NOTICE file distributed with this work for additional
+ information regarding copyright ownership. The ASF licenses this file to
+ you under the Apache License, Version 2.0 (the "License"); you may not use
+ this file except in compliance with the License. You may obtain a copy of
+ the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
+ by applicable law or agreed to in writing, software distributed under the
+ License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
+ OF ANY KIND, either express or implied. See the License for the specific
+ language governing permissions and limitations under the License. -->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
@@ -54,7 +46,6 @@ under the License.
<dependency>
<groupId>org.apache.rya</groupId>
<artifactId>rya.periodic.service.notification</artifactId>
- <version>${project.version}</version>
<exclusions>
<exclusion>
<artifactId>logback-classic</artifactId>
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplicationIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplicationIT.java b/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplicationIT.java
index cb7557c..9109775 100644
--- a/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplicationIT.java
+++ b/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplicationIT.java
@@ -21,7 +21,6 @@ package org.apache.rya.periodic.notification.application;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
-import java.nio.file.Files;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
@@ -34,15 +33,16 @@ import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
+import java.util.UUID;
import javax.xml.datatype.DatatypeConfigurationException;
import javax.xml.datatype.DatatypeFactory;
-import org.I0Itec.zkclient.ZkClient;
import org.apache.accumulo.core.client.Connector;
import org.apache.fluo.api.client.FluoClient;
import org.apache.fluo.api.config.FluoConfiguration;
import org.apache.fluo.core.client.FluoClientImpl;
+import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
@@ -52,21 +52,27 @@ import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.rya.api.resolver.RdfToRyaConversions;
import org.apache.rya.indexing.accumulo.ConfigUtils;
+import org.apache.rya.indexing.pcj.fluo.api.CreatePeriodicQuery;
import org.apache.rya.indexing.pcj.fluo.api.InsertTriples;
import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants;
import org.apache.rya.indexing.pcj.fluo.app.util.FluoClientFactory;
+import org.apache.rya.indexing.pcj.fluo.app.util.FluoQueryUtils;
import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage;
import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator;
import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPeriodicQueryResultStorage;
+import org.apache.rya.kafka.base.EmbeddedKafkaInstance;
+import org.apache.rya.kafka.base.EmbeddedKafkaSingleton;
+import org.apache.rya.kafka.base.KafkaTestInstanceRule;
import org.apache.rya.pcj.fluo.test.base.RyaExportITBase;
-import org.apache.rya.periodic.notification.api.CreatePeriodicQuery;
import org.apache.rya.periodic.notification.notification.CommandNotification;
-import org.apache.rya.periodic.notification.registration.kafka.KafkaNotificationRegistrationClient;
+import org.apache.rya.periodic.notification.registration.KafkaNotificationRegistrationClient;
import org.apache.rya.periodic.notification.serialization.BindingSetSerDe;
import org.apache.rya.periodic.notification.serialization.CommandNotificationSerializer;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
import org.junit.Test;
import org.openrdf.model.Statement;
import org.openrdf.model.Value;
@@ -81,14 +87,9 @@ import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
-import kafka.server.KafkaConfig;
-import kafka.server.KafkaServer;
-import kafka.utils.MockTime;
-import kafka.utils.TestUtils;
-import kafka.utils.Time;
-import kafka.utils.ZKStringSerializer$;
-import kafka.utils.ZkUtils;
-import kafka.zk.EmbeddedZookeeper;
+import static org.apache.rya.periodic.notification.application.PeriodicNotificationApplicationConfiguration.NOTIFICATION_TOPIC;
+import static org.apache.rya.periodic.notification.application.PeriodicNotificationApplicationConfiguration.KAFKA_BOOTSTRAP_SERVERS;;
+
public class PeriodicNotificationApplicationIT extends RyaExportITBase {
@@ -97,45 +98,38 @@ public class PeriodicNotificationApplicationIT extends RyaExportITBase {
private KafkaProducer<String, CommandNotification> producer;
private Properties props;
private Properties kafkaProps;
- PeriodicNotificationApplicationConfiguration conf;
+ private PeriodicNotificationApplicationConfiguration conf;
+ private static EmbeddedKafkaInstance embeddedKafka = EmbeddedKafkaSingleton.getInstance();
+ private static String bootstrapServers;
+
+ @Rule
+ public KafkaTestInstanceRule rule = new KafkaTestInstanceRule(false);
- private static final String ZKHOST = "127.0.0.1";
- private static final String BROKERHOST = "127.0.0.1";
- private static final String BROKERPORT = "9092";
- private ZkUtils zkUtils;
- private KafkaServer kafkaServer;
- private EmbeddedZookeeper zkServer;
- private ZkClient zkClient;
+ @BeforeClass
+ public static void initClass() {
+ bootstrapServers = embeddedKafka.createBootstrapServerConfig().getProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG);
+ }
@Before
public void init() throws Exception {
- setUpKafka();
+ String topic = rule.getKafkaTopicName();
+ rule.createTopic(topic);
+
+ //get user specified props and update with the embedded kafka bootstrap servers and rule generated topic
props = getProps();
+ props.setProperty(NOTIFICATION_TOPIC, topic);
+ props.setProperty(KAFKA_BOOTSTRAP_SERVERS, bootstrapServers);
conf = new PeriodicNotificationApplicationConfiguration(props);
+
+ //create Kafka Producer
kafkaProps = getKafkaProperties(conf);
- app = PeriodicNotificationApplicationFactory.getPeriodicApplication(props);
producer = new KafkaProducer<>(kafkaProps, new StringSerializer(), new CommandNotificationSerializer());
+
+ //extract kafka specific properties from application config
+ app = PeriodicNotificationApplicationFactory.getPeriodicApplication(props);
registrar = new KafkaNotificationRegistrationClient(conf.getNotificationTopic(), producer);
}
- private void setUpKafka() throws Exception {
- // Setup Kafka.
- zkServer = new EmbeddedZookeeper();
- final String zkConnect = ZKHOST + ":" + zkServer.port();
- zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer$.MODULE$);
- zkUtils = ZkUtils.apply(zkClient, false);
-
- // setup Brokersparql
- final Properties brokerProps = new Properties();
- brokerProps.setProperty("zookeeper.connect", zkConnect);
- brokerProps.setProperty("broker.id", "0");
- brokerProps.setProperty("log.dirs", Files.createTempDirectory("kafka-").toAbsolutePath().toString());
- brokerProps.setProperty("listeners", "PLAINTEXT://" + BROKERHOST + ":" + BROKERPORT);
- final KafkaConfig config = new KafkaConfig(brokerProps);
- final Time mock = new MockTime();
- kafkaServer = TestUtils.createServer(config, mock);
- }
-
@Test
public void periodicApplicationWithAggAndGroupByTest() throws Exception {
@@ -185,10 +179,10 @@ public class PeriodicNotificationApplicationIT extends RyaExportITBase {
Connector connector = ConfigUtils.getConnector(conf);
PeriodicQueryResultStorage storage = new AccumuloPeriodicQueryResultStorage(connector, conf.getTablePrefix());
CreatePeriodicQuery periodicQuery = new CreatePeriodicQuery(fluo, storage);
- String id = periodicQuery.createQueryAndRegisterWithKafka(sparql, registrar);
+ String id = FluoQueryUtils.convertFluoQueryIdToPcjId(periodicQuery.createPeriodicQuery(sparql, registrar).getQueryId());
addData(statements);
app.start();
-//
+
Multimap<Long, BindingSet> actual = HashMultimap.create();
try (KafkaConsumer<String, BindingSet> consumer = new KafkaConsumer<>(kafkaProps, new StringDeserializer(), new BindingSetSerDe())) {
consumer.subscribe(Arrays.asList(id));
@@ -321,10 +315,10 @@ public class PeriodicNotificationApplicationIT extends RyaExportITBase {
Connector connector = ConfigUtils.getConnector(conf);
PeriodicQueryResultStorage storage = new AccumuloPeriodicQueryResultStorage(connector, conf.getTablePrefix());
CreatePeriodicQuery periodicQuery = new CreatePeriodicQuery(fluo, storage);
- String id = periodicQuery.createQueryAndRegisterWithKafka(sparql, registrar);
+ String id = FluoQueryUtils.convertFluoQueryIdToPcjId(periodicQuery.createPeriodicQuery(sparql, registrar).getQueryId());
addData(statements);
app.start();
-//
+
Multimap<Long, BindingSet> expected = HashMultimap.create();
try (KafkaConsumer<String, BindingSet> consumer = new KafkaConsumer<>(kafkaProps, new StringDeserializer(), new BindingSetSerDe())) {
consumer.subscribe(Arrays.asList(id));
@@ -411,10 +405,10 @@ public class PeriodicNotificationApplicationIT extends RyaExportITBase {
Connector connector = ConfigUtils.getConnector(conf);
PeriodicQueryResultStorage storage = new AccumuloPeriodicQueryResultStorage(connector, conf.getTablePrefix());
CreatePeriodicQuery periodicQuery = new CreatePeriodicQuery(fluo, storage);
- String id = periodicQuery.createQueryAndRegisterWithKafka(sparql, registrar);
+ String id = FluoQueryUtils.convertFluoQueryIdToPcjId(periodicQuery.createPeriodicQuery(sparql, registrar).getQueryId());
addData(statements);
app.start();
-//
+
Multimap<Long, BindingSet> expected = HashMultimap.create();
try (KafkaConsumer<String, BindingSet> consumer = new KafkaConsumer<>(kafkaProps, new StringDeserializer(), new BindingSetSerDe())) {
consumer.subscribe(Arrays.asList(id));
@@ -458,13 +452,6 @@ public class PeriodicNotificationApplicationIT extends RyaExportITBase {
public void shutdown() {
registrar.close();
app.stop();
- teardownKafka();
- }
-
- private void teardownKafka() {
- kafkaServer.shutdown();
- zkClient.close();
- zkServer.shutdown();
}
private void addData(Collection<Statement> statements) throws DatatypeConfigurationException {
@@ -473,20 +460,17 @@ public class PeriodicNotificationApplicationIT extends RyaExportITBase {
InsertTriples inserter = new InsertTriples();
statements.forEach(x -> inserter.insert(fluo, RdfToRyaConversions.convertStatement(x)));
getMiniFluo().waitForObservers();
-// FluoITHelper.printFluoTable(fluo);
}
-
}
- private Properties getKafkaProperties(PeriodicNotificationApplicationConfiguration conf) {
+ private static Properties getKafkaProperties(PeriodicNotificationApplicationConfiguration conf) {
Properties kafkaProps = new Properties();
- kafkaProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, conf.getBootStrapServers());
- kafkaProps.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, conf.getNotificationClientId());
+ kafkaProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+ kafkaProps.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, UUID.randomUUID().toString());
kafkaProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, conf.getNotificationGroupId());
kafkaProps.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return kafkaProps;
}
-
private Properties getProps() throws IOException {
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/application/PeriodicNotificationProviderIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/application/PeriodicNotificationProviderIT.java b/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/application/PeriodicNotificationProviderIT.java
index cf24974..e05ca6f 100644
--- a/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/application/PeriodicNotificationProviderIT.java
+++ b/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/application/PeriodicNotificationProviderIT.java
@@ -31,10 +31,11 @@ import org.apache.rya.indexing.pcj.fluo.app.util.FluoQueryUtils;
import org.apache.rya.periodic.notification.coordinator.PeriodicNotificationCoordinatorExecutor;
import org.apache.rya.periodic.notification.notification.TimestampedNotification;
import org.apache.rya.periodic.notification.recovery.PeriodicNotificationProvider;
+import org.junit.Assert;
import org.junit.Test;
import org.openrdf.query.MalformedQueryException;
-import org.junit.Assert;
+import com.google.common.collect.Sets;
public class PeriodicNotificationProviderIT extends AccumuloExportITBase {
@@ -55,7 +56,7 @@ public class PeriodicNotificationProviderIT extends AccumuloExportITBase {
String id = null;
try(FluoClient fluo = new FluoClientImpl(getFluoConfiguration())) {
- id = pcj.createPcj(sparql, fluo).getQueryId();
+ id = pcj.createPcj(FluoQueryUtils.createNewPcjId(), sparql, Sets.newHashSet(), fluo).getQueryId();
provider.processRegisteredNotifications(coord, fluo.newSnapshot());
}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/exporter/PeriodicNotificationExporterIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/exporter/PeriodicNotificationExporterIT.java b/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/exporter/PeriodicNotificationExporterIT.java
index c5dc809..874e7e2 100644
--- a/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/exporter/PeriodicNotificationExporterIT.java
+++ b/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/exporter/PeriodicNotificationExporterIT.java
@@ -36,6 +36,7 @@ import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage;
import org.apache.rya.kafka.base.KafkaITBase;
import org.apache.rya.kafka.base.KafkaTestInstanceRule;
+import org.apache.rya.periodic.notification.api.BindingSetRecord;
import org.apache.rya.periodic.notification.serialization.BindingSetSerDe;
import org.junit.Assert;
import org.junit.Rule;
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/processor/PeriodicNotificationProcessorIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/processor/PeriodicNotificationProcessorIT.java b/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/processor/PeriodicNotificationProcessorIT.java
index fa60e48..21109ae 100644
--- a/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/processor/PeriodicNotificationProcessorIT.java
+++ b/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/processor/PeriodicNotificationProcessorIT.java
@@ -30,8 +30,8 @@ import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage;
import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPeriodicQueryResultStorage;
import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
+import org.apache.rya.periodic.notification.api.BindingSetRecord;
import org.apache.rya.periodic.notification.api.NodeBin;
-import org.apache.rya.periodic.notification.exporter.BindingSetRecord;
import org.apache.rya.periodic.notification.notification.PeriodicNotification;
import org.apache.rya.periodic.notification.notification.TimestampedNotification;
import org.junit.Assert;
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/pruner/PeriodicNotificationBinPrunerIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/pruner/PeriodicNotificationBinPrunerIT.java b/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/pruner/PeriodicNotificationBinPrunerIT.java
index bb98b7f..830fa46 100644
--- a/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/pruner/PeriodicNotificationBinPrunerIT.java
+++ b/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/pruner/PeriodicNotificationBinPrunerIT.java
@@ -39,9 +39,11 @@ import org.apache.fluo.api.data.ColumnValue;
import org.apache.fluo.api.data.Span;
import org.apache.fluo.core.client.FluoClientImpl;
import org.apache.rya.api.resolver.RdfToRyaConversions;
+import org.apache.rya.indexing.pcj.fluo.api.CreatePeriodicQuery;
import org.apache.rya.indexing.pcj.fluo.api.InsertTriples;
import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants;
import org.apache.rya.indexing.pcj.fluo.app.NodeType;
+import org.apache.rya.indexing.pcj.fluo.app.util.FluoQueryUtils;
import org.apache.rya.indexing.pcj.fluo.app.util.PeriodicQueryUtil;
import org.apache.rya.indexing.pcj.fluo.app.util.RowKeyUtil;
import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage;
@@ -50,9 +52,7 @@ import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableItera
import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPeriodicQueryResultStorage;
import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
import org.apache.rya.pcj.fluo.test.base.RyaExportITBase;
-import org.apache.rya.periodic.notification.api.CreatePeriodicQuery;
import org.apache.rya.periodic.notification.api.NodeBin;
-import org.apache.rya.periodic.notification.notification.PeriodicNotification;
import org.junit.Assert;
import org.junit.Test;
import org.openrdf.model.Statement;
@@ -85,8 +85,7 @@ public class PeriodicNotificationBinPrunerIT extends RyaExportITBase {
PeriodicQueryResultStorage periodicStorage = new AccumuloPeriodicQueryResultStorage(super.getAccumuloConnector(),
getRyaInstanceName());
CreatePeriodicQuery createPeriodicQuery = new CreatePeriodicQuery(fluo, periodicStorage);
- PeriodicNotification notification = createPeriodicQuery.createPeriodicQuery(sparql);
- String queryId = notification.getId();
+ String queryId = FluoQueryUtils.convertFluoQueryIdToPcjId(createPeriodicQuery.createPeriodicQuery(sparql).getQueryId());
// create statements to ingest into Fluo
final ValueFactory vf = new ValueFactoryImpl();
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/registration/kafka/PeriodicCommandNotificationConsumerIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/registration/kafka/PeriodicCommandNotificationConsumerIT.java b/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/registration/kafka/PeriodicCommandNotificationConsumerIT.java
index bde406f..522e69d 100644
--- a/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/registration/kafka/PeriodicCommandNotificationConsumerIT.java
+++ b/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/registration/kafka/PeriodicCommandNotificationConsumerIT.java
@@ -18,31 +18,44 @@
*/package org.apache.rya.periodic.notification.registration.kafka;
import java.util.Properties;
+import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
+import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.log4j.BasicConfigurator;
-import org.apache.rya.pcj.fluo.test.base.KafkaExportITBase;
+import org.apache.rya.kafka.base.KafkaITBase;
+import org.apache.rya.kafka.base.KafkaTestInstanceRule;
import org.apache.rya.periodic.notification.coordinator.PeriodicNotificationCoordinatorExecutor;
import org.apache.rya.periodic.notification.notification.CommandNotification;
import org.apache.rya.periodic.notification.notification.TimestampedNotification;
+import org.apache.rya.periodic.notification.registration.KafkaNotificationRegistrationClient;
import org.apache.rya.periodic.notification.serialization.CommandNotificationSerializer;
-import org.junit.After;
import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
import org.junit.Test;
-public class PeriodicCommandNotificationConsumerIT extends KafkaExportITBase {
+public class PeriodicCommandNotificationConsumerIT extends KafkaITBase {
- private static final String topic = "topic";
private KafkaNotificationRegistrationClient registration;
private PeriodicNotificationCoordinatorExecutor coord;
private KafkaNotificationProvider provider;
+ private String bootstrapServer;
+
+ @Rule
+ public KafkaTestInstanceRule rule = new KafkaTestInstanceRule(false);
+
+ @Before
+ public void init() throws Exception {
+ bootstrapServer = createBootstrapServerConfig().getProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG);
+ }
@Test
public void kafkaNotificationProviderTest() throws InterruptedException {
@@ -52,6 +65,9 @@ public class PeriodicCommandNotificationConsumerIT extends KafkaExportITBase {
BlockingQueue<TimestampedNotification> notifications = new LinkedBlockingQueue<>();
Properties props = createKafkaConfig();
KafkaProducer<String, CommandNotification> producer = new KafkaProducer<>(props);
+ String topic = rule.getKafkaTopicName();
+ rule.createTopic(topic);
+
registration = new KafkaNotificationRegistrationClient(topic, producer);
coord = new PeriodicNotificationCoordinatorExecutor(1, notifications);
provider = new KafkaNotificationProvider(topic, new StringDeserializer(), new CommandNotificationSerializer(), props, coord, 1);
@@ -80,6 +96,9 @@ public class PeriodicCommandNotificationConsumerIT extends KafkaExportITBase {
BlockingQueue<TimestampedNotification> notifications = new LinkedBlockingQueue<>();
Properties props = createKafkaConfig();
KafkaProducer<String, CommandNotification> producer = new KafkaProducer<>(props);
+ String topic = rule.getKafkaTopicName();
+ rule.createTopic(topic);
+
registration = new KafkaNotificationRegistrationClient(topic, producer);
coord = new PeriodicNotificationCoordinatorExecutor(1, notifications);
provider = new KafkaNotificationProvider(topic, new StringDeserializer(), new CommandNotificationSerializer(), props, coord, 1);
@@ -108,8 +127,8 @@ public class PeriodicCommandNotificationConsumerIT extends KafkaExportITBase {
private Properties createKafkaConfig() {
Properties props = new Properties();
- props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
- props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group0");
+ props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
+ props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
props.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "consumer0");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.periodic.service/periodic.service.notification/pom.xml
----------------------------------------------------------------------
diff --git a/extras/rya.periodic.service/periodic.service.notification/pom.xml b/extras/rya.periodic.service/periodic.service.notification/pom.xml
index 2173888..1e59e15 100644
--- a/extras/rya.periodic.service/periodic.service.notification/pom.xml
+++ b/extras/rya.periodic.service/periodic.service.notification/pom.xml
@@ -1,107 +1,112 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <!-- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with this
- work for additional information regarding copyright ownership. The ASF licenses
- this file to you under the Apache License, Version 2.0 (the "License"); you
- may not use this file except in compliance with the License. You may obtain
- a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless
- required by applicable law or agreed to in writing, software distributed
- under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES
- OR CONDITIONS OF ANY KIND, either express or implied. See the License for
- the specific language governing permissions and limitations under the License. -->
- <parent>
- <groupId>org.apache.rya</groupId>
- <artifactId>rya.periodic.service</artifactId>
- <version>3.2.11-incubating-SNAPSHOT</version>
- </parent>
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <!-- Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with this
+ work for additional information regarding copyright ownership. The ASF licenses
+ this file to you under the Apache License, Version 2.0 (the "License"); you
+ may not use this file except in compliance with the License. You may obtain
+ a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless
+ required by applicable law or agreed to in writing, software distributed
+ under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES
+ OR CONDITIONS OF ANY KIND, either express or implied. See the License for
+ the specific language governing permissions and limitations under the License. -->
+ <parent>
+ <groupId>org.apache.rya</groupId>
+ <artifactId>rya.periodic.service</artifactId>
+ <version>3.2.11-incubating-SNAPSHOT</version>
+ </parent>
- <artifactId>rya.periodic.service.notification</artifactId>
-
- <name>Apache Rya Periodic Service Notification</name>
+ <artifactId>rya.periodic.service.notification</artifactId>
+
+ <name>Apache Rya Periodic Service Notification</name>
<description>Notifications for Rya Periodic Service</description>
- <dependencies>
+ <dependencies>
+
+ <dependency>
+ <groupId>org.apache.twill</groupId>
+ <artifactId>twill-api</artifactId>
+ <version>0.11.0</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.twill</groupId>
+ <artifactId>twill-yarn</artifactId>
+ <version>0.11.0</version>
+ <exclusions>
+ <exclusion>
+ <artifactId>kafka_2.10</artifactId>
+ <groupId>org.apache.kafka</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>com.google.code.gson</groupId>
+ <artifactId>gson</artifactId>
+ <version>2.8.0</version>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.fluo</groupId>
+ <artifactId>fluo-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.fluo</groupId>
+ <artifactId>fluo-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.rya</groupId>
+ <artifactId>rya.indexing</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.openrdf.sesame</groupId>
+ <artifactId>sesame-query</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.rya</groupId>
+ <artifactId>rya.indexing.pcj</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.rya</groupId>
+ <artifactId>rya.pcj.fluo.app</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.rya</groupId>
+ <artifactId>rya.periodic.service.api</artifactId>
+ </dependency>
- <dependency>
- <groupId>org.apache.twill</groupId>
- <artifactId>twill-api</artifactId>
- <version>0.11.0</version>
- </dependency>
- <dependency>
- <groupId>org.apache.twill</groupId>
- <artifactId>twill-yarn</artifactId>
- <version>0.11.0</version>
- <exclusions>
- <exclusion>
- <artifactId>kafka_2.10</artifactId>
- <groupId>org.apache.kafka</groupId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
- <groupId>com.google.code.gson</groupId>
- <artifactId>gson</artifactId>
- <version>2.8.0</version>
- <scope>compile</scope>
- </dependency>
- <dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.fluo</groupId>
- <artifactId>fluo-api</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.fluo</groupId>
- <artifactId>fluo-core</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.rya</groupId>
- <artifactId>rya.indexing</artifactId>
- </dependency>
- <dependency>
- <groupId>org.openrdf.sesame</groupId>
- <artifactId>sesame-query</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.rya</groupId>
- <artifactId>rya.indexing.pcj</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.rya</groupId>
- <artifactId>rya.pcj.fluo.app</artifactId>
- </dependency>
- </dependencies>
+ </dependencies>
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-compiler-plugin</artifactId>
- <configuration>
- <encoding>UTF-8</encoding>
- <source>1.8</source>
- <target>1.8</target>
- </configuration>
- </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-shade-plugin</artifactId>
- <version>3.0.0</version>
- <executions>
- <execution>
- <phase>package</phase>
- <goals>
- <goal>shade</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- </plugins>
- </build>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <configuration>
+ <encoding>UTF-8</encoding>
+ <source>1.8</source>
+ <target>1.8</target>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <version>3.0.0</version>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
</project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/BinPruner.java
----------------------------------------------------------------------
diff --git a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/BinPruner.java b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/BinPruner.java
deleted file mode 100644
index 571ee1c..0000000
--- a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/BinPruner.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.rya.periodic.notification.api;
-
-import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants;
-import org.openrdf.query.Binding;
-
-/**
- * Object that cleans up old {@link BindingSet}s corresponding to the specified
- * {@link NodeBin}. This class deletes all BindingSets with the bin
- * indicated by {@link NodeBin#getBin()}. A BindingSet corresponds to a given
- * bin if it contains a {@link Binding} with name {@link IncrementalUpdateConstants#PERIODIC_BIN_ID}
- * and value equal to the given bin.
- *
- */
-public interface BinPruner {
-
- /**
- * Cleans up all {@link BindingSet}s associated with the indicated {@link NodeBin}.
- * @param bin - NodeBin that indicates which BindingSets to delete..
- */
- public void pruneBindingSetBin(NodeBin bin);
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/BindingSetExporter.java
----------------------------------------------------------------------
diff --git a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/BindingSetExporter.java b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/BindingSetExporter.java
deleted file mode 100644
index 500a435..0000000
--- a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/BindingSetExporter.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.rya.periodic.notification.api;
-
-import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporter.ResultExportException;
-import org.apache.rya.periodic.notification.exporter.BindingSetRecord;
-
-/**
- * An Object that is used to export {@link BindingSet}s to an external repository or queuing system.
- *
- */
-public interface BindingSetExporter {
-
- /**
- * This method exports the BindingSet to the external repository or queuing system
- * that this BindingSetExporter is configured to export to.
- * @param bindingSet - {@link BindingSet} to be exported
- * @throws ResultExportException
- */
- public void exportNotification(BindingSetRecord bindingSet) throws ResultExportException;
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/CreatePeriodicQuery.java
----------------------------------------------------------------------
diff --git a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/CreatePeriodicQuery.java b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/CreatePeriodicQuery.java
deleted file mode 100644
index 60a3e7c..0000000
--- a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/CreatePeriodicQuery.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.rya.periodic.notification.api;
-
-import java.util.Optional;
-
-import org.apache.fluo.api.client.FluoClient;
-import org.apache.rya.api.client.CreatePCJ.ExportStrategy;
-import org.apache.rya.indexing.pcj.fluo.api.CreateFluoPcj;
-import org.apache.rya.indexing.pcj.fluo.app.query.PeriodicQueryNode;
-import org.apache.rya.indexing.pcj.fluo.app.query.UnsupportedQueryException;
-import org.apache.rya.indexing.pcj.fluo.app.util.FluoQueryUtils;
-import org.apache.rya.indexing.pcj.fluo.app.util.PeriodicQueryUtil;
-import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage;
-import org.apache.rya.indexing.pcj.storage.PeriodicQueryStorageException;
-import org.apache.rya.periodic.notification.application.PeriodicNotificationApplication;
-import org.apache.rya.periodic.notification.notification.PeriodicNotification;
-import org.openrdf.query.MalformedQueryException;
-import org.openrdf.query.algebra.evaluation.function.Function;
-
-import com.google.common.collect.Sets;
-
-/**
- * Object that creates a Periodic Query. A Periodic Query is any query
- * requesting periodic updates about events that occurred within a given
- * window of time of this instant. This is also known as a rolling window
- * query. Period Queries can be expressed using SPARQL by including the
- * {@link Function} indicated by the URI {@link PeriodicQueryUtil#PeriodicQueryURI}
- * in the query. The user must provide this Function with the following arguments:
- * the temporal variable in the query that will be filtered on, the window of time
- * that events must occur within, the period at which the user wants to receive updates,
- * and the time unit. The following query requests all observations that occurred
- * within the last minute and requests updates every 15 seconds. It also performs
- * a count on those observations.
- * <li>
- * <li> prefix function: http://org.apache.rya/function#
- * <li> "prefix time: http://www.w3.org/2006/time#
- * <li> "select (count(?obs) as ?total) where {
- * <li> "Filter(function:periodic(?time, 1, .25, time:minutes))
- * <li> "?obs uri:hasTime ?time.
- * <li> "?obs uri:hasId ?id }
- * <li>
- *
- * This class is responsible for taking a Periodic Query expressed as a SPARQL query
- * and adding to Fluo and Kafka so that it can be processed by the {@link PeriodicNotificationApplication}.
- */
-public class CreatePeriodicQuery {
-
- private FluoClient fluoClient;
- private PeriodicQueryResultStorage periodicStorage;
- Function funciton;
- PeriodicQueryUtil util;
-
-
- public CreatePeriodicQuery(FluoClient fluoClient, PeriodicQueryResultStorage periodicStorage) {
- this.fluoClient = fluoClient;
- this.periodicStorage = periodicStorage;
- }
-
- /**
- * Creates a Periodic Query by adding the query to Fluo and using the resulting
- * Fluo id to create a {@link PeriodicQueryResultStorage} table.
- * @param sparql - sparql query registered to Fluo whose results are stored in PeriodicQueryResultStorage table
- * @return PeriodicNotification that can be used to register register this query with the {@link PeriodicNotificationApplication}.
- */
- public PeriodicNotification createPeriodicQuery(String sparql) {
- try {
- Optional<PeriodicQueryNode> optNode = PeriodicQueryUtil.getPeriodicNode(sparql);
- if(optNode.isPresent()) {
- PeriodicQueryNode periodicNode = optNode.get();
- String pcjId = FluoQueryUtils.createNewPcjId();
-
- //register query with Fluo
- CreateFluoPcj createPcj = new CreateFluoPcj();
- createPcj.createPcj(pcjId, sparql, Sets.newHashSet(ExportStrategy.RYA), fluoClient);
-
- //register query with PeriodicResultStorage table
- periodicStorage.createPeriodicQuery(pcjId, sparql);
- //create notification
- PeriodicNotification notification = PeriodicNotification.builder().id(pcjId).period(periodicNode.getPeriod())
- .timeUnit(periodicNode.getUnit()).build();
- return notification;
- } else {
- throw new RuntimeException("Invalid PeriodicQuery. Query must possess a PeriodicQuery Filter.");
- }
- } catch (MalformedQueryException | PeriodicQueryStorageException | UnsupportedQueryException e) {
- throw new RuntimeException(e);
- }
- }
-
- /**
- * Creates a Periodic Query by adding the query to Fluo and using the resulting
- * Fluo id to create a {@link PeriodicQueryResultStorage} table. In addition, this
- * method registers the PeriodicQuery with the PeriodicNotificationApplication to poll
- * the PeriodicQueryResultStorage table at regular intervals and export results to Kafka.
- * The PeriodicNotificationApp queries the result table at a regular interval indicated by the Period of
- * the PeriodicQuery.
- * @param sparql - sparql query registered to Fluo whose results are stored in PeriodicQueryResultStorage table
- * @param PeriodicNotificationClient - registers the PeriodicQuery with the {@link PeriodicNotificationApplication}
- * @return id of the PeriodicQuery and PeriodicQueryResultStorage table (these are the same)
- */
- public String createQueryAndRegisterWithKafka(String sparql, PeriodicNotificationClient periodicClient) {
- PeriodicNotification notification = createPeriodicQuery(sparql);
- periodicClient.addNotification(notification);
- return notification.getId();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/LifeCycle.java
----------------------------------------------------------------------
diff --git a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/LifeCycle.java b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/LifeCycle.java
deleted file mode 100644
index b1e8bad..0000000
--- a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/LifeCycle.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.rya.periodic.notification.api;
-
-/**
- * Interface providing basic life cycle functionality,
- * including stopping and starting any class implementing this
- * interface and checking whether is it running.
- *
- */
-public interface LifeCycle {
-
- /**
- * Starts a running application.
- */
- public void start();
-
- /**
- * Stops a running application.
- */
- public void stop();
-
- /**
- * Determine if application is currently running.
- * @return true if application is running and false otherwise.
- */
- public boolean currentlyRunning();
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/NodeBin.java
----------------------------------------------------------------------
diff --git a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/NodeBin.java b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/NodeBin.java
deleted file mode 100644
index 3ed7979..0000000
--- a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/NodeBin.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.rya.periodic.notification.api;
-
-import java.util.Objects;
-
-/**
- * Object used to indicate the id of a given Periodic Query
- * along with a particular bin of results. This Object is used
- * by the {@link BinPruner} to clean up old query results after
- * they have been processed.
- *
- */
-public class NodeBin {
-
- private long bin;
- private String nodeId;
-
- public NodeBin(String nodeId, long bin) {
- this.bin = bin;
- this.nodeId = nodeId;
- }
-
- /**
- * @return id of Periodic Query
- */
- public String getNodeId() {
- return nodeId;
- }
-/**
- * @return bin id of results for a given Periodic Query
- */
- public long getBin() {
- return bin;
- }
-
- @Override
- public boolean equals(Object other) {
- if (this == other) {
- return true;
- }
-
- if (other instanceof NodeBin) {
- NodeBin bin = (NodeBin) other;
- return this.bin == bin.bin && this.nodeId.equals(bin.nodeId);
- }
-
- return false;
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(bin, nodeId);
- }
-
- @Override
- public String toString() {
- return new StringBuilder().append("Node Bin \n").append(" QueryId: " + nodeId + "\n").append(" Bin: " + bin + "\n").toString();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/Notification.java
----------------------------------------------------------------------
diff --git a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/Notification.java b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/Notification.java
deleted file mode 100644
index 3e9e0d1..0000000
--- a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/Notification.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.rya.periodic.notification.api;
-
-/**
- * Notification Object used by the Periodic Query Service
- * to inform workers to process results for a given Periodic
- * Query with the indicated id.
- *
- */
-public interface Notification {
-
- /**
- * @return id of a Periodic Query
- */
- public String getId();
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/NotificationCoordinatorExecutor.java
----------------------------------------------------------------------
diff --git a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/NotificationCoordinatorExecutor.java b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/NotificationCoordinatorExecutor.java
deleted file mode 100644
index d53dc17..0000000
--- a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/NotificationCoordinatorExecutor.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.rya.periodic.notification.api;
-
-import java.util.concurrent.ScheduledExecutorService;
-
-import org.apache.rya.periodic.notification.notification.CommandNotification;
-
-/**
- * Object that manages the periodic notifications for the Periodic Query Service.
- * This Object processes new requests for periodic updates by registering them with
- * some sort of service that generates periodic updates (such as a {@link ScheduledExecutorService}).
- *
- */
-public interface NotificationCoordinatorExecutor extends LifeCycle {
-
- /**
- * Registers or deletes a {@link CommandNotification}s with the periodic service to
- * generate notifications at a regular interval indicated by the CommandNotification.
- * @param notification - CommandNotification to be registered or deleted from the periodic update
- * service.
- */
- public void processNextCommandNotification(CommandNotification notification);
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/NotificationProcessor.java
----------------------------------------------------------------------
diff --git a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/NotificationProcessor.java b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/NotificationProcessor.java
deleted file mode 100644
index 4ac9089..0000000
--- a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/NotificationProcessor.java
+++ /dev/null
@@ -1,41 +0,0 @@
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.rya.periodic.notification.api;
-
-import org.apache.rya.periodic.notification.notification.TimestampedNotification;
-
-/**
- * Object that processes new {@link TimestampedNotification}s generated by {@link NotificationCoordinatorExecutor}.
- * It is expected that the NotificationCoordinatorExecutor will this Object with notifications to perform work via some sort
- * sort of queuing service such as a BlockingQueue or Kafka. This Object processes the notifications by retrieving
- * query results associated with the Periodic Query id given by {@link TimestampedNotification#getId()}, parsing them
- * and then providing them to another service to be exported.
- *
- */
-public interface NotificationProcessor {
-
- /**
- * Processes {@link TimestampedNotification}s by retrieving the Periodic Query results
- * associated the query id given by {@link TimestampedNotification#getId()}.
- * @param notification - contains information about which query results to retrieve
- */
- public void processNotification(TimestampedNotification notification);
-
-}