You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rya.apache.org by ca...@apache.org on 2018/01/18 21:00:28 UTC
incubator-rya git commit: RYA-442 Implementing the Start and Stop
Query interactors. Closes #265.
Repository: incubator-rya
Updated Branches:
refs/heads/master 6ec8cd2aa -> 3d4a5d0e6
RYA-442 Implementing the Start and Stop Query interactors. Closes #265.
Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/3d4a5d0e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/3d4a5d0e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/3d4a5d0e
Branch: refs/heads/master
Commit: 3d4a5d0e6e5766c2285fc5e359fb0d645818a8e9
Parents: 6ec8cd2
Author: kchilton2 <ke...@gmail.com>
Authored: Fri Jan 12 14:48:55 2018 -0500
Committer: caleb <ca...@parsons.com>
Committed: Thu Jan 18 14:28:24 2018 -0500
----------------------------------------------------------------------
.../rya/streams/api/entity/StreamsQuery.java | 19 ++-
.../rya/streams/api/interactor/AddQuery.java | 4 +-
.../interactor/defaults/DefaultAddQuery.java | 4 +-
.../interactor/defaults/DefaultStartQuery.java | 54 +++++++
.../interactor/defaults/DefaultStopQuery.java | 54 +++++++
.../api/queries/InMemoryQueryRepository.java | 119 ++++++++++-----
.../rya/streams/api/queries/QueryChange.java | 46 +++++-
.../streams/api/queries/QueryRepository.java | 15 +-
.../defaults/DefaultAddQueryTest.java | 6 +-
.../queries/InMemoryQueryRepositoryTest.java | 39 +++--
.../streams/client/command/AddQueryCommand.java | 6 +-
.../client/command/AddQueryCommandIT.java | 6 +-
.../client/command/DeleteQueryCommandIT.java | 146 +++++++++----------
.../client/command/ListQueryCommandIT.java | 12 +-
.../client/command/RunQueryCommandIT.java | 2 +-
.../kafka/queries/KafkaQueryChangeLog.java | 25 ++--
.../kafka/interactor/KafkaRunQueryIT.java | 2 +-
.../kafka/queries/KafkaQueryChangeLogIT.java | 38 ++++-
18 files changed, 424 insertions(+), 173 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/3d4a5d0e/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/entity/StreamsQuery.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/entity/StreamsQuery.java b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/entity/StreamsQuery.java
index 8239025..bd750a6 100644
--- a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/entity/StreamsQuery.java
+++ b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/entity/StreamsQuery.java
@@ -1,4 +1,4 @@
-/*
+/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -34,16 +34,19 @@ public class StreamsQuery {
private final UUID queryId;
private final String sparql;
+ private final boolean isActive;
/**
* Constructs an instance of {@link StreamsQuery}.
*
* @param queryId - Uniquely identifies the query within Rya Streams. (not null)
* @param sparql - The SPARQL query that defines how statements will be processed. (not null)
+ * @param isActive - {@code true} if Rya Streams should process this query; otherwise {@code false}.
*/
- public StreamsQuery(final UUID queryId, final String sparql) {
+ public StreamsQuery(final UUID queryId, final String sparql, final boolean isActive) {
this.queryId = requireNonNull(queryId);
this.sparql = requireNonNull(sparql);
+ this.isActive = isActive;
}
/**
@@ -60,9 +63,16 @@ public class StreamsQuery {
return sparql;
}
+ /**
+ * @return {@code true} if Rya Streams should process this query; otherwise {@code false}.
+ */
+ public boolean isActive() {
+ return isActive;
+ }
+
@Override
public int hashCode() {
- return Objects.hash(queryId, sparql);
+ return Objects.hash(queryId, sparql, isActive);
}
@Override
@@ -70,7 +80,8 @@ public class StreamsQuery {
if(o instanceof StreamsQuery) {
final StreamsQuery other = (StreamsQuery) o;
return Objects.equals(queryId, other.queryId) &&
- Objects.equals(sparql, other.sparql);
+ Objects.equals(sparql, other.sparql) &&
+ isActive == other.isActive;
}
return false;
}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/3d4a5d0e/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/AddQuery.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/AddQuery.java b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/AddQuery.java
index 8915d98..9889fd0 100644
--- a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/AddQuery.java
+++ b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/AddQuery.java
@@ -34,8 +34,10 @@ public interface AddQuery {
* Adds a query to the Rya Streams system.
*
* @param query - The SPARQL query that will be added. (not null)
+ * @param isActive - {@code true} if the query needs to be maintained by
+ * Rya Streams; otherwise {@code false}.
* @return The {@link StreamsQuery} used by Rya Streams for this query.
* @throws RyaStreamsException The query could not be added to Rya Streams.
*/
- public StreamsQuery addQuery(final String query) throws RyaStreamsException;
+ public StreamsQuery addQuery(final String query, boolean isActive) throws RyaStreamsException;
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/3d4a5d0e/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/defaults/DefaultAddQuery.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/defaults/DefaultAddQuery.java b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/defaults/DefaultAddQuery.java
index f94835c..edd90fd 100644
--- a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/defaults/DefaultAddQuery.java
+++ b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/defaults/DefaultAddQuery.java
@@ -49,7 +49,7 @@ public class DefaultAddQuery implements AddQuery {
}
@Override
- public StreamsQuery addQuery(final String query) throws RyaStreamsException {
+ public StreamsQuery addQuery(final String query, final boolean isActive) throws RyaStreamsException {
requireNonNull(query);
// Make sure the SPARQL is valid.
@@ -60,6 +60,6 @@ public class DefaultAddQuery implements AddQuery {
}
// If it is, then store it in the repository.
- return repository.add(query);
+ return repository.add(query, isActive);
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/3d4a5d0e/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/defaults/DefaultStartQuery.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/defaults/DefaultStartQuery.java b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/defaults/DefaultStartQuery.java
new file mode 100644
index 0000000..3ee693e
--- /dev/null
+++ b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/defaults/DefaultStartQuery.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.api.interactor.defaults;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.UUID;
+
+import org.apache.rya.streams.api.exception.RyaStreamsException;
+import org.apache.rya.streams.api.interactor.StartQuery;
+import org.apache.rya.streams.api.queries.QueryRepository;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * Start a query that is managed by Rya Streams.
+ */
+@DefaultAnnotation(NonNull.class)
+public class DefaultStartQuery implements StartQuery {
+
+ private final QueryRepository repository;
+
+ /**
+ * Constructs an instance of {@link DefaultStartQuery}.
+ *
+ * @param repository - The {@link QueryRepository} that will be interacted with. (not null)
+ */
+ public DefaultStartQuery(final QueryRepository repository) {
+ this.repository = requireNonNull(repository);
+ }
+
+ @Override
+ public void start(final UUID queryId) throws RyaStreamsException {
+ requireNonNull(queryId);
+ repository.updateIsActive(queryId, true);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/3d4a5d0e/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/defaults/DefaultStopQuery.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/defaults/DefaultStopQuery.java b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/defaults/DefaultStopQuery.java
new file mode 100644
index 0000000..382b0f1
--- /dev/null
+++ b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/defaults/DefaultStopQuery.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.api.interactor.defaults;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.UUID;
+
+import org.apache.rya.streams.api.exception.RyaStreamsException;
+import org.apache.rya.streams.api.interactor.StopQuery;
+import org.apache.rya.streams.api.queries.QueryRepository;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * Stop a query that is managed by Rya Streams.
+ */
+@DefaultAnnotation(NonNull.class)
+public class DefaultStopQuery implements StopQuery {
+
+ private final QueryRepository repository;
+
+ /**
+ * Constructs an instance of {@link DefaultStopQuery}.
+ *
+ * @param repository - The {@link QueryRepository} that will be interacted with. (not null)
+ */
+ public DefaultStopQuery(final QueryRepository repository) {
+ this.repository = requireNonNull(repository);
+ }
+
+ @Override
+ public void stop(final UUID queryId) throws RyaStreamsException {
+ requireNonNull(queryId);
+ repository.updateIsActive(queryId, false);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/3d4a5d0e/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/InMemoryQueryRepository.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/InMemoryQueryRepository.java b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/InMemoryQueryRepository.java
index 80678de..f4b7b25 100644
--- a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/InMemoryQueryRepository.java
+++ b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/InMemoryQueryRepository.java
@@ -33,16 +33,15 @@ import org.apache.rya.streams.api.queries.QueryChangeLog.QueryChangeLogException
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.base.Supplier;
-import com.google.common.base.Suppliers;
-
import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
import edu.umd.cs.findbugs.annotations.NonNull;
import info.aduna.iteration.CloseableIteration;
/**
- * An in memory implementation of {@link QueryRepository}. It is lazily initialized the first time one of its
- * functions is invoked.
+ * An in memory implementation of {@link QueryRepository}. It is lazily
+ * initialized the first time one of its functions is invoked and it updates
+ * its view of the {@link QueryChangeLog} any time a method is invoked that
+ * requires the latest view of the queries.
* </p>
* Thread safe.
*/
@@ -51,39 +50,47 @@ public class InMemoryQueryRepository implements QueryRepository {
private static final Logger LOG = LoggerFactory.getLogger(InMemoryQueryRepository.class);
private final ReentrantLock lock = new ReentrantLock();
- private final Supplier<Map<UUID, StreamsQuery>> queriesCache;
+ /**
+ * The change log that is the ground truth for describing what the queries look like.
+ */
private final QueryChangeLog changeLog;
/**
+ * Represents the position within the {@link QueryChangeLog} that {@code queriesCache} represents.
+ */
+ private Optional<Long> cachePosition = Optional.empty();
+
+ /**
+ * The most recently cached view of the queries within this repository.
+ */
+ private final Map<UUID, StreamsQuery> queriesCache = new HashMap<>();
+
+ /**
* Constructs an instance of {@link InMemoryQueryRepository}.
*
* @param changeLog - The change log that this repository will maintain and be based on. (not null)
*/
public InMemoryQueryRepository(final QueryChangeLog changeLog) {
this.changeLog = requireNonNull(changeLog);
-
- // Lazily initialize the queries cache the first time you try to use it.
- queriesCache = Suppliers.memoize(() -> initializeCache(changeLog));
}
@Override
- public StreamsQuery add(final String query) throws QueryRepositoryException {
+ public StreamsQuery add(final String query, final boolean isActive) throws QueryRepositoryException {
requireNonNull(query);
lock.lock();
try {
// First record the change to the log.
final UUID queryId = UUID.randomUUID();
- final QueryChange change = QueryChange.create(queryId, query);
+ final QueryChange change = QueryChange.create(queryId, query, isActive);
changeLog.write(change);
- // Then update the view of the change log within the repository.
- final StreamsQuery streamsQuery = new StreamsQuery(queryId, query);
- queriesCache.get().put(queryId, streamsQuery);
+ // Update the cache to represent what is currently in the log.
+ updateCache();
- // Return the SreamsQuery that represents the just added query.
- return streamsQuery;
+ // Return the StreamsQuery that represents the just added query.
+ return queriesCache.get(queryId);
} catch (final QueryChangeLogException e) {
throw new QueryRepositoryException("Could not create a Rya Streams query for the SPARQL string: " + query, e);
@@ -98,7 +105,35 @@ public class InMemoryQueryRepository implements QueryRepository {
lock.lock();
try {
- return Optional.ofNullable( queriesCache.get().get(queryId) );
+ // Update the cache to represent what is currently in the log.
+ updateCache();
+
+ return Optional.ofNullable( queriesCache.get(queryId) );
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @Override
+ public void updateIsActive(final UUID queryId, final boolean isActive) throws QueryRepositoryException {
+ requireNonNull(queryId);
+
+ lock.lock();
+ try {
+ // Update the cache to represent what is currently in the log.
+ updateCache();
+
+ // Ensure the query is in the log.
+ if(!queriesCache.containsKey(queryId)) {
+ throw new QueryRepositoryException("No query exists for ID " + queryId + ".");
+ }
+
+ // First record the change to the log.
+ final QueryChange change = QueryChange.update(queryId, isActive);
+ changeLog.write(change);
+
+ } catch (final QueryChangeLogException e) {
+ throw new QueryRepositoryException("Could not update the Rya Streams query for with ID: " + queryId, e);
} finally {
lock.unlock();
}
@@ -114,9 +149,6 @@ public class InMemoryQueryRepository implements QueryRepository {
final QueryChange change = QueryChange.delete(queryId);
changeLog.write(change);
- // Then update the view of the change log within the repository.
- queriesCache.get().remove(queryId);
-
} catch (final QueryChangeLogException e) {
throw new QueryRepositoryException("Could not delete a Rya Streams query for the Query ID: " + queryId, e);
} finally {
@@ -128,8 +160,11 @@ public class InMemoryQueryRepository implements QueryRepository {
public Set<StreamsQuery> list() throws QueryRepositoryException {
lock.lock();
try {
- // Our internal cache is already up to date, so just return it's values.
- return queriesCache.get().values()
+ // Update the cache to represent what is currently in the log.
+ updateCache();
+
+ // Our internal cache is already up to date, so just return its values.
+ return queriesCache.values()
.stream()
.collect(Collectors.toSet());
@@ -149,22 +184,19 @@ public class InMemoryQueryRepository implements QueryRepository {
}
/**
- * A {@link Map} from query id to the {@link StreamsQuery} that is represented by that id based on what
- * is already in a {@link QueryChangeLog}.
- *
- * @param changeLog - The change log the cache will represent. (not null)
- * @return The most recent view of the change log.
+ * Updates the {@link #queriesCache} to reflect the latest position within the {@link #changeLog}.
*/
- private static Map<UUID, StreamsQuery> initializeCache(final QueryChangeLog changeLog) {
+ private void updateCache() {
requireNonNull(changeLog);
- // The Map that will be initialized and then returned by this supplier.
- final Map<UUID, StreamsQuery> queriesCache = new HashMap<>();
-
CloseableIteration<ChangeLogEntry<QueryChange>, QueryChangeLogException> it = null;
try {
- // Iterate over everything that is already in the change log.
- it = changeLog.readFromStart();
+ // Iterate over everything since the last position that was handled within the change log.
+ if(cachePosition.isPresent()) {
+ it = changeLog.readFromPosition(cachePosition.get() + 1);
+ } else {
+ it = changeLog.readFromStart();
+ }
// Apply each change to the cache.
while(it.hasNext()) {
@@ -174,18 +206,31 @@ public class InMemoryQueryRepository implements QueryRepository {
switch(change.getChangeType()) {
case CREATE:
- final StreamsQuery query = new StreamsQuery(queryId, change.getSparql().get());
+ final StreamsQuery query = new StreamsQuery(
+ queryId,
+ change.getSparql().get(),
+ change.getIsActive().get());
queriesCache.put(queryId, query);
break;
+ case UPDATE:
+ if(queriesCache.containsKey(queryId)) {
+ final StreamsQuery old = queriesCache.get(queryId);
+ final StreamsQuery updated = new StreamsQuery(
+ old.getQueryId(),
+ old.getSparql(),
+ change.getIsActive().get());
+ queriesCache.put(queryId, updated);
+ }
+ break;
+
case DELETE:
queriesCache.remove(queryId);
break;
}
- }
- // Return the initialized cache.
- return queriesCache;
+ cachePosition = Optional.of( entry.getPosition() );
+ }
} catch (final QueryChangeLogException e) {
// Rethrow the exception because the object the supplier tried to create could not be created.
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/3d4a5d0e/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryChange.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryChange.java b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryChange.java
index 90af79c..d283957 100644
--- a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryChange.java
+++ b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryChange.java
@@ -1,4 +1,4 @@
-/*
+/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -37,9 +37,11 @@ import edu.umd.cs.findbugs.annotations.NonNull;
@DefaultAnnotation(NonNull.class)
public final class QueryChange implements Serializable {
private static final long serialVersionUID = 1L;
+
private final UUID queryId;
private final ChangeType changeType;
private final Optional<String> sparql;
+ private final Optional<Boolean> isActive;
/**
* Constructs an instance of {@link QueryChange}. Use the {@link #create(UUID, String)} or {@link #delete(UUID)}
@@ -48,14 +50,18 @@ public final class QueryChange implements Serializable {
* @param queryId - Uniquely identifies the query within Rya Streams. (not null)
* @param changeType - Indicates the type of change this object represents. (not null)
* @param sparql - If this is a create change, then the SPARQL query that will be evaluated within Rya Streams. (not null)
+ * @param isActive - If this is a create or update change, then the active state that defines if the
+ * query will be evaluated by RyaStreams. (not null)
*/
private QueryChange(
final UUID queryId,
final ChangeType changeType,
- final Optional<String> sparql) {
+ final Optional<String> sparql,
+ final Optional<Boolean> isActive) {
this.queryId = requireNonNull(queryId);
this.changeType = requireNonNull(changeType);
this.sparql = requireNonNull(sparql);
+ this.isActive = requireNonNull(isActive);
}
/**
@@ -79,9 +85,17 @@ public final class QueryChange implements Serializable {
return sparql;
}
+ /**
+ * @return If this is a create or update change, then the active state that defines if the
+ * query will be evaluated by RyaStreams. (not null)
+ */
+ public Optional<Boolean> getIsActive() {
+ return isActive;
+ }
+
@Override
public int hashCode() {
- return Objects.hash(queryId, changeType, sparql);
+ return Objects.hash(queryId, changeType, sparql, isActive);
}
@Override
@@ -90,7 +104,8 @@ public final class QueryChange implements Serializable {
final QueryChange change = (QueryChange) o;
return Objects.equals(queryId, change.queryId) &&
Objects.equals(changeType, change.changeType) &&
- Objects.equals(sparql, change.sparql);
+ Objects.equals(sparql, change.sparql) &&
+ Objects.equals(isActive, change.isActive);
}
return false;
}
@@ -100,10 +115,22 @@ public final class QueryChange implements Serializable {
*
* @param queryId - Uniquely identifies the query within the streaming system. (not null)
* @param sparql - The query that will be evaluated. (not null)
+ * @param isActive - The active state that defines if the query will be evaluated by RyaStreams. (not null)
+ * @return A {@link QueryChange} built using the provided values.
+ */
+ public static QueryChange create(final UUID queryId, final String sparql, final boolean isActive) {
+ return new QueryChange(queryId, ChangeType.CREATE, Optional.of(sparql), Optional.of(isActive));
+ }
+
+ /**
+ * Create a {@link QueryChange} that represents a query in Rya Streams whose active state has changed.
+ *
+ * @param queryId - Uniquely identifies the query within the streaming system. (not null)
+ * @param isActive - The active state that defines if the query will be evaluated by RyaStreams. (not null)
* @return A {@link QueryChange} built using the provided values.
*/
- public static QueryChange create(final UUID queryId, final String sparql) {
- return new QueryChange(queryId, ChangeType.CREATE, Optional.of(sparql));
+ public static QueryChange update(final UUID queryId, final boolean isActive) {
+ return new QueryChange(queryId, ChangeType.UPDATE, Optional.absent(), Optional.of(isActive));
}
/**
@@ -113,7 +140,7 @@ public final class QueryChange implements Serializable {
* @return A {@link QueryChange} built using the provided values.
*/
public static QueryChange delete(final UUID queryId) {
- return new QueryChange(queryId, ChangeType.DELETE, Optional.absent());
+ return new QueryChange(queryId, ChangeType.DELETE, Optional.absent(), Optional.absent());
}
/**
@@ -126,6 +153,11 @@ public final class QueryChange implements Serializable {
CREATE,
/**
+ * The {@link QueryChange} indicates something about a registered query changed.
+ */
+ UPDATE,
+
+ /**
* The {@link QueryChange} indicates a SPARQL query no longer needs to be processed by Rya Streams.
*/
DELETE;
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/3d4a5d0e/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryRepository.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryRepository.java b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryRepository.java
index 7269588..fd51b2f 100644
--- a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryRepository.java
+++ b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryRepository.java
@@ -38,10 +38,23 @@ public interface QueryRepository extends AutoCloseable {
* Adds a new query to Rya Streams.
*
* @param query - The SPARQL query to add. (not null)
+ * @param isActive - {@code true} if the query should be processed after it is added
+ * otherwise {@code false}.
* @return The {@link StreamsQuery} used in Rya Streams.
* @throws QueryRepositoryException Could not add the query.
*/
- public StreamsQuery add(final String query) throws QueryRepositoryException;
+ public StreamsQuery add(final String query, boolean isActive) throws QueryRepositoryException;
+
+ /**
+ * Updates the isActive state of a {@link StreamsQuery}. Setting this value to {@code true}
+ * means Rya Streams will start processing the query. Setting it to {@code false} will stop
+ * the processing.
+ *
+ * @param queryId - Identifies which query will be updated. (not null)
+ * @param isActive - The new isActive state for the query.
+ * @throws QueryRepositoryException If the query does not exist or something else caused the change to fail.
+ */
+ public void updateIsActive(UUID queryId, boolean isActive) throws QueryRepositoryException;
/**
* Get an existing query from Rya Streams.
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/3d4a5d0e/extras/rya.streams/api/src/test/java/org/apache/rya/streams/api/interactor/defaults/DefaultAddQueryTest.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/api/src/test/java/org/apache/rya/streams/api/interactor/defaults/DefaultAddQueryTest.java b/extras/rya.streams/api/src/test/java/org/apache/rya/streams/api/interactor/defaults/DefaultAddQueryTest.java
index 88be6e7..77a0a15 100644
--- a/extras/rya.streams/api/src/test/java/org/apache/rya/streams/api/interactor/defaults/DefaultAddQueryTest.java
+++ b/extras/rya.streams/api/src/test/java/org/apache/rya/streams/api/interactor/defaults/DefaultAddQueryTest.java
@@ -43,10 +43,10 @@ public class DefaultAddQueryTest {
final AddQuery addQuery = new DefaultAddQuery(repo);
// Add the query.
- addQuery.addQuery(sparql);
+ addQuery.addQuery(sparql, true);
// Verify the call was forwarded to the repository.
- verify(repo, times(1)).add(eq(sparql));
+ verify(repo, times(1)).add(eq(sparql), eq(true));
}
@Test(expected = RyaStreamsException.class)
@@ -59,6 +59,6 @@ public class DefaultAddQueryTest {
final AddQuery addQuery = new DefaultAddQuery(repo);
// Add the query.
- addQuery.addQuery(sparql);
+ addQuery.addQuery(sparql, true);
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/3d4a5d0e/extras/rya.streams/api/src/test/java/org/apache/rya/streams/api/queries/InMemoryQueryRepositoryTest.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/api/src/test/java/org/apache/rya/streams/api/queries/InMemoryQueryRepositoryTest.java b/extras/rya.streams/api/src/test/java/org/apache/rya/streams/api/queries/InMemoryQueryRepositoryTest.java
index 92193ca..22e616d 100644
--- a/extras/rya.streams/api/src/test/java/org/apache/rya/streams/api/queries/InMemoryQueryRepositoryTest.java
+++ b/extras/rya.streams/api/src/test/java/org/apache/rya/streams/api/queries/InMemoryQueryRepositoryTest.java
@@ -43,9 +43,9 @@ public class InMemoryQueryRepositoryTest {
try(final QueryRepository queries = new InMemoryQueryRepository( new InMemoryQueryChangeLog() )) {
// Add some queries to it.
final Set<StreamsQuery> expected = new HashSet<>();
- expected.add( queries.add("query 1") );
- expected.add( queries.add("query 2") );
- expected.add( queries.add("query 3") );
+ expected.add( queries.add("query 1", true) );
+ expected.add( queries.add("query 2", false) );
+ expected.add( queries.add("query 3", true) );
// Show they are in the list of all queries.
final Set<StreamsQuery> stored = queries.list();
@@ -59,9 +59,9 @@ public class InMemoryQueryRepositoryTest {
try(final QueryRepository queries = new InMemoryQueryRepository( new InMemoryQueryChangeLog() )) {
// Add some queries to it. The second one we will delete.
final Set<StreamsQuery> expected = new HashSet<>();
- expected.add( queries.add("query 1") );
- final UUID deletedMeId = queries.add("query 2").getQueryId();
- expected.add( queries.add("query 3") );
+ expected.add( queries.add("query 1", true) );
+ final UUID deletedMeId = queries.add("query 2", false).getQueryId();
+ expected.add( queries.add("query 3", true) );
// Delete the second query.
queries.delete( deletedMeId );
@@ -73,15 +73,15 @@ public class InMemoryQueryRepositoryTest {
}
@Test
- public void initializedWithPopulatedChnageLog() throws Exception {
+ public void initializedWithPopulatedChangeLog() throws Exception {
// Setup a totally in memory QueryRepository. Hold onto the change log so that we can use it again later.
final QueryChangeLog changeLog = new InMemoryQueryChangeLog();
try(final QueryRepository queries = new InMemoryQueryRepository( changeLog )) {
// Add some queries and deletes to it.
final Set<StreamsQuery> expected = new HashSet<>();
- expected.add( queries.add("query 1") );
- final UUID deletedMeId = queries.add("query 2").getQueryId();
- expected.add( queries.add("query 3") );
+ expected.add( queries.add("query 1", true) );
+ final UUID deletedMeId = queries.add("query 2", false).getQueryId();
+ expected.add( queries.add("query 3", true) );
queries.delete( deletedMeId );
// Create a new totally in memory QueryRepository.
@@ -110,7 +110,7 @@ public class InMemoryQueryRepositoryTest {
// Setup a totally in memory QueryRepository.
try(final QueryRepository queries = new InMemoryQueryRepository( new InMemoryQueryChangeLog() )) {
// Add a query to it.
- final StreamsQuery query = queries.add("query 1");
+ final StreamsQuery query = queries.add("query 1", true);
// Show the fetched query matches the expected ones.
final Optional<StreamsQuery> fetched = queries.get(query.getQueryId());
@@ -129,4 +129,21 @@ public class InMemoryQueryRepositoryTest {
assertFalse(query.isPresent());
}
}
+
+ @Test
+ public void update() throws Exception {
+ // Setup a totally in memory QueryRepository.
+ try(final QueryRepository queries = new InMemoryQueryRepository( new InMemoryQueryChangeLog() )) {
+ // Add a query to it.
+ final StreamsQuery query = queries.add("query 1", true);
+
+ // Change the isActive state of that query.
+ queries.updateIsActive(query.getQueryId(), false);
+
+ // Show the fetched query matches the expected one.
+ final Optional<StreamsQuery> fetched = queries.get(query.getQueryId());
+ final StreamsQuery expected = new StreamsQuery(query.getQueryId(), query.getSparql(), false);
+ assertEquals(expected, fetched.get());
+ }
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/3d4a5d0e/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/AddQueryCommand.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/AddQueryCommand.java b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/AddQueryCommand.java
index c72e6a2..275a975 100644
--- a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/AddQueryCommand.java
+++ b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/AddQueryCommand.java
@@ -52,6 +52,9 @@ public class AddQueryCommand implements RyaStreamsCommand {
@Parameter(names = { "--query", "-q" }, required = true, description = "The SPARQL query to add to Rya Streams.")
private String query;
+ @Parameter(names = {"--isActive", "-a"}, required = false, description = "True if the added query will be started.")
+ private String isActive;
+
@Override
public String toString() {
final StringBuilder parameters = new StringBuilder();
@@ -60,6 +63,7 @@ public class AddQueryCommand implements RyaStreamsCommand {
if (!Strings.isNullOrEmpty(query)) {
parameters.append("\tQuery: " + query + "\n");
}
+ parameters.append("\tIs Active: " + isActive + "\n");
return parameters.toString();
}
}
@@ -115,7 +119,7 @@ public class AddQueryCommand implements RyaStreamsCommand {
try(QueryRepository queryRepo = new InMemoryQueryRepository(queryChangeLog)) {
final AddQuery addQuery = new DefaultAddQuery(queryRepo);
try {
- final StreamsQuery query = addQuery.addQuery(params.query);
+ final StreamsQuery query = addQuery.addQuery(params.query, Boolean.parseBoolean(params.isActive));
System.out.println("Added query: " + query.getSparql());
} catch (final RyaStreamsException e) {
System.err.println("Unable to parse query: " + params.query);
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/3d4a5d0e/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/AddQueryCommandIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/AddQueryCommandIT.java b/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/AddQueryCommandIT.java
index 3a412d2..8b4f074 100644
--- a/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/AddQueryCommandIT.java
+++ b/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/AddQueryCommandIT.java
@@ -80,7 +80,8 @@ public class AddQueryCommandIT {
"-r", "" + ryaInstance,
"-i", kafka.getKafkaHostname(),
"-p", kafka.getKafkaPort(),
- "-q", query
+ "-q", query,
+ "-a", "true"
};
// Execute the command.
@@ -101,7 +102,8 @@ public class AddQueryCommandIT {
"--ryaInstance", "" + ryaInstance,
"--kafkaHostname", kafka.getKafkaHostname(),
"--kafkaPort", kafka.getKafkaPort(),
- "--query", query
+ "--query", query,
+ "--isActive", "true"
};
// Execute the command.
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/3d4a5d0e/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/DeleteQueryCommandIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/DeleteQueryCommandIT.java b/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/DeleteQueryCommandIT.java
index 91647f2..6083543 100644
--- a/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/DeleteQueryCommandIT.java
+++ b/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/DeleteQueryCommandIT.java
@@ -1,4 +1,4 @@
-/*
+/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -18,7 +18,6 @@
*/
package org.apache.rya.streams.client.command;
-import static java.util.Objects.requireNonNull;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
@@ -40,6 +39,8 @@ import org.apache.rya.streams.kafka.serialization.queries.QueryChangeDeserialize
import org.apache.rya.streams.kafka.serialization.queries.QueryChangeSerializer;
import org.apache.rya.test.kafka.KafkaTestInstanceRule;
import org.apache.rya.test.kafka.KafkaTestUtil;
+import org.junit.After;
+import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
@@ -48,103 +49,90 @@ import org.junit.Test;
*/
public class DeleteQueryCommandIT {
+ private final String ryaInstance = UUID.randomUUID().toString();
+ private QueryRepository queryRepo;
+
@Rule
public KafkaTestInstanceRule kafka = new KafkaTestInstanceRule(true);
- /**
- * This test simulates executing many commands and each of them use their own InMemoryQueryRepository. We need
- * to re-create the repo outside of the command to ensure it has the most up to date values inside of it.
- *
- * @param ryaInstance - The rya instance the repository is connected to. (not null)
- * @param createTopic - Set this to true if the topic doesn't exist yet.
- */
- private QueryRepository makeQueryRepository(final String ryaInstance, final boolean createTopic) {
- requireNonNull(ryaInstance);
-
+ @Before
+ public void setup() {
// Make sure the topic that the change log uses exists.
- final String changeLogTopic = KafkaTopics.queryChangeLogTopic("" + ryaInstance);
- if(createTopic) {
- kafka.createTopic(changeLogTopic);
- }
+ final String changeLogTopic = KafkaTopics.queryChangeLogTopic(ryaInstance);
+ System.out.println("Test Change Log Topic: " + changeLogTopic);
+ kafka.createTopic(changeLogTopic);
// Setup the QueryRepository used by the test.
final Producer<?, QueryChange> queryProducer = KafkaTestUtil.makeProducer(kafka, StringSerializer.class, QueryChangeSerializer.class);
final Consumer<?, QueryChange>queryConsumer = KafkaTestUtil.fromStartConsumer(kafka, StringDeserializer.class, QueryChangeDeserializer.class);
final QueryChangeLog changeLog = new KafkaQueryChangeLog(queryProducer, queryConsumer, changeLogTopic);
- return new InMemoryQueryRepository(changeLog);
+ queryRepo = new InMemoryQueryRepository(changeLog);
+ }
+
+ @After
+ public void cleanup() throws Exception {
+ queryRepo.close();
}
@Test
public void shortParams() throws Exception {
- final String ryaInstance = UUID.randomUUID().toString();
-
// Add a few queries to Rya Streams.
- try(QueryRepository repo = makeQueryRepository(ryaInstance, true)) {
- repo.add("query1");
- final UUID query2Id = repo.add("query2").getQueryId();
- repo.add("query3");
-
- // Show that all three of the queries were added.
- Set<StreamsQuery> queries = repo.list();
- assertEquals(3, queries.size());
-
- // Delete query 2 using the delete query command.
- final String[] deleteArgs = new String[] {
- "-r", "" + ryaInstance,
- "-i", kafka.getKafkaHostname(),
- "-p", kafka.getKafkaPort(),
- "-q", query2Id.toString()
- };
-
- final DeleteQueryCommand deleteCommand = new DeleteQueryCommand();
- deleteCommand.execute(deleteArgs);
-
- // Show query2 was deleted.
- try(QueryRepository repo2 = makeQueryRepository(ryaInstance, false)) {
- queries = repo2.list();
- assertEquals(2, queries.size());
-
- for(final StreamsQuery query : queries) {
- assertNotEquals(query2Id, query.getQueryId());
- }
- }
+ queryRepo.add("query1", true);
+ final UUID query2Id = queryRepo.add("query2", false).getQueryId();
+ queryRepo.add("query3", true);
+
+ // Show that all three of the queries were added.
+ Set<StreamsQuery> queries = queryRepo.list();
+ assertEquals(3, queries.size());
+
+ // Delete query 2 using the delete query command.
+ final String[] deleteArgs = new String[] {
+ "-r", ryaInstance,
+ "-i", kafka.getKafkaHostname(),
+ "-p", kafka.getKafkaPort(),
+ "-q", query2Id.toString()
+ };
+
+ final DeleteQueryCommand deleteCommand = new DeleteQueryCommand();
+ deleteCommand.execute(deleteArgs);
+
+ // Show query2 was deleted.
+ queries = queryRepo.list();
+ assertEquals(2, queries.size());
+
+ for(final StreamsQuery query : queries) {
+ assertNotEquals(query2Id, query.getQueryId());
}
}
@Test
public void longParams() throws Exception {
- final String ryaInstance = UUID.randomUUID().toString();
-
// Add a few queries to Rya Streams.
- try(QueryRepository repo = makeQueryRepository(ryaInstance, true)) {
- repo.add("query1");
- final UUID query2Id = repo.add("query2").getQueryId();
- repo.add("query3");
-
- // Show that all three of the queries were added.
- Set<StreamsQuery> queries = repo.list();
- assertEquals(3, queries.size());
-
- // Delete query 2 using the delete query command.
- final String[] deleteArgs = new String[] {
- "--ryaInstance", "" + ryaInstance,
- "--kafkaHostname", kafka.getKafkaHostname(),
- "--kafkaPort", kafka.getKafkaPort(),
- "--queryID", query2Id.toString()
- };
-
- final DeleteQueryCommand deleteCommand = new DeleteQueryCommand();
- deleteCommand.execute(deleteArgs);
-
- // Show query2 was deleted.
- try(QueryRepository repo2 = makeQueryRepository(ryaInstance, false)) {
- queries = repo2.list();
- assertEquals(2, queries.size());
-
- for(final StreamsQuery query : queries) {
- assertNotEquals(query2Id, query.getQueryId());
- }
- }
+ queryRepo.add("query1", true);
+ final UUID query2Id = queryRepo.add("query2", false).getQueryId();
+ queryRepo.add("query3", true);
+
+ // Show that all three of the queries were added.
+ Set<StreamsQuery> queries = queryRepo.list();
+ assertEquals(3, queries.size());
+
+ // Delete query 2 using the delete query command.
+ final String[] deleteArgs = new String[] {
+ "--ryaInstance", "" + ryaInstance,
+ "--kafkaHostname", kafka.getKafkaHostname(),
+ "--kafkaPort", kafka.getKafkaPort(),
+ "--queryID", query2Id.toString()
+ };
+
+ final DeleteQueryCommand deleteCommand = new DeleteQueryCommand();
+ deleteCommand.execute(deleteArgs);
+
+ // Show query2 was deleted.
+ queries = queryRepo.list();
+ assertEquals(2, queries.size());
+
+ for(final StreamsQuery query : queries) {
+ assertNotEquals(query2Id, query.getQueryId());
}
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/3d4a5d0e/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/ListQueryCommandIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/ListQueryCommandIT.java b/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/ListQueryCommandIT.java
index 00b4ce0..1399142 100644
--- a/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/ListQueryCommandIT.java
+++ b/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/ListQueryCommandIT.java
@@ -71,9 +71,9 @@ public class ListQueryCommandIT {
@Test
public void shortParams() throws Exception {
// Add a few queries to Rya Streams.
- queryRepo.add("query1");
- queryRepo.add("query2");
- queryRepo.add("query3");
+ queryRepo.add("query1", true);
+ queryRepo.add("query2", false);
+ queryRepo.add("query3", true);
// Execute the List Queries command.
final String[] args = new String[] {
@@ -89,9 +89,9 @@ public class ListQueryCommandIT {
@Test
public void longParams() throws Exception {
// Add a few queries to Rya Streams.
- queryRepo.add("query1");
- queryRepo.add("query2");
- queryRepo.add("query3");
+ queryRepo.add("query1", true);
+ queryRepo.add("query2", false);
+ queryRepo.add("query3", true);
// Execute the List Queries command.
final String[] args = new String[] {
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/3d4a5d0e/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/RunQueryCommandIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/RunQueryCommandIT.java b/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/RunQueryCommandIT.java
index f2100e8..3389d6b 100644
--- a/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/RunQueryCommandIT.java
+++ b/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/RunQueryCommandIT.java
@@ -114,7 +114,7 @@ public class RunQueryCommandIT {
@Test
public void runQuery() throws Exception {
// Register a query with the Query Repository.
- final StreamsQuery sQuery = queryRepo.add("SELECT * WHERE { ?person <urn:worksAt> ?business . }");
+ final StreamsQuery sQuery = queryRepo.add("SELECT * WHERE { ?person <urn:worksAt> ?business . }", true);
// Arguments that run the query we just registered with Rya Streams.
final String[] args = new String[] {
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/3d4a5d0e/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/queries/KafkaQueryChangeLog.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/queries/KafkaQueryChangeLog.java b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/queries/KafkaQueryChangeLog.java
index 9403e4b..2822272 100644
--- a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/queries/KafkaQueryChangeLog.java
+++ b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/queries/KafkaQueryChangeLog.java
@@ -140,18 +140,13 @@ public class KafkaQueryChangeLog implements QueryChangeLog {
@Override
public boolean hasNext() throws QueryChangeLogException {
- if (iterCache == null || !iterCache.hasNext()) {
- populateCache();
- }
+ maybePopulateCache();
return iterCache.hasNext();
}
@Override
public ChangeLogEntry<QueryChange> next() throws QueryChangeLogException {
- if (iterCache == null && iterCache.hasNext()) {
- populateCache();
- }
-
+ maybePopulateCache();
if (iterCache.hasNext()) {
return iterCache.next();
}
@@ -167,14 +162,14 @@ public class KafkaQueryChangeLog implements QueryChangeLog {
consumer.unsubscribe();
}
- private void populateCache() {
- final ConsumerRecords<?, QueryChange> records = consumer.poll(3000L);
- final List<ChangeLogEntry<QueryChange>> changes = new ArrayList<>();
- records.forEach(
- record ->
- changes.add(new ChangeLogEntry<>(record.offset(), record.value()))
- );
- iterCache = changes.iterator();
+ private void maybePopulateCache() {
+ // If the cache isn't initialized yet, or it is empty, then check to see if there is more to put into it.
+ if (iterCache == null || !iterCache.hasNext()) {
+ final ConsumerRecords<?, QueryChange> records = consumer.poll(3000L);
+ final List<ChangeLogEntry<QueryChange>> changes = new ArrayList<>();
+ records.forEach(record -> changes.add(new ChangeLogEntry<>(record.offset(), record.value())));
+ iterCache = changes.iterator();
+ }
}
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/3d4a5d0e/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaRunQueryIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaRunQueryIT.java b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaRunQueryIT.java
index 33b3a92..9a773f0 100644
--- a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaRunQueryIT.java
+++ b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaRunQueryIT.java
@@ -86,7 +86,7 @@ public class KafkaRunQueryIT {
final QueryRepository queries = new InMemoryQueryRepository( new InMemoryQueryChangeLog() );
// Add the query to the query repository.
- final StreamsQuery sQuery = queries.add("SELECT * WHERE { ?person <urn:worksAt> ?business . }");
+ final StreamsQuery sQuery = queries.add("SELECT * WHERE { ?person <urn:worksAt> ?business . }", true);
final UUID queryId = sQuery.getQueryId();
final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/3d4a5d0e/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/queries/KafkaQueryChangeLogIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/queries/KafkaQueryChangeLogIT.java b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/queries/KafkaQueryChangeLogIT.java
index 04c81ed..c2b821f 100644
--- a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/queries/KafkaQueryChangeLogIT.java
+++ b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/queries/KafkaQueryChangeLogIT.java
@@ -19,6 +19,7 @@
package org.apache.rya.streams.kafka.queries;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import java.util.ArrayList;
import java.util.List;
@@ -78,7 +79,7 @@ public class KafkaQueryChangeLogIT extends KafkaITBase {
public void testWrite() throws Exception {
final String sparql = "SOME QUERY HERE";
final UUID uuid = UUID.randomUUID();
- final QueryChange newChange = QueryChange.create(uuid, sparql);
+ final QueryChange newChange = QueryChange.create(uuid, sparql, true);
changeLog.write(newChange);
consumer.subscribe(Lists.newArrayList(topic));
@@ -90,6 +91,17 @@ public class KafkaQueryChangeLogIT extends KafkaITBase {
}
@Test
+ public void readSingleWrite() throws Exception {
+ // Write a single change to the log.
+ final QueryChange change = QueryChange.create(UUID.randomUUID(), "query", true);
+ changeLog.write(change);
+
+ // Read that entry from the log.
+ final QueryChange readChange = changeLog.readFromStart().next().getEntry();
+ assertEquals(change, readChange);
+ }
+
+ @Test
public void readFromBegining() throws Exception {
final List<QueryChange> expected = write10ChangesToChangeLog();
@@ -175,12 +187,34 @@ public class KafkaQueryChangeLogIT extends KafkaITBase {
assertEquals(0, count);
}
+ @Test
+ public void multipleClients() throws Exception {
+ // Create a second KafkaQueryChangeLog objects that connect to the same change log.
+ final Producer<?, QueryChange> producer2 = KafkaTestUtil.makeProducer(rule, StringSerializer.class, QueryChangeSerializer.class);
+ final Consumer<?, QueryChange> consumer2 = KafkaTestUtil.fromStartConsumer(rule, StringDeserializer.class, QueryChangeDeserializer.class);
+ try(final KafkaQueryChangeLog changeLog2 = new KafkaQueryChangeLog(producer2, consumer2, topic)) {
+ // Show both of them report empty.
+ assertFalse( changeLog.readFromStart().hasNext() );
+ assertFalse( changeLog2.readFromStart().hasNext() );
+
+ // Write a change to the first log.
+ final QueryChange change = QueryChange.create(UUID.randomUUID(), "query", true);
+ changeLog.write(change);
+
+ // Show it's in the first log.
+ assertEquals(change, changeLog.readFromStart().next().getEntry());
+
+ // Show it is also seen in the second log.
+ assertEquals(change, changeLog2.readFromStart().next().getEntry());
+ }
+ }
+
private List<QueryChange> write10ChangesToChangeLog() throws Exception {
final List<QueryChange> changes = new ArrayList<>();
for (int ii = 0; ii < 10; ii++) {
final String sparql = "SOME QUERY HERE_" + ii;
final UUID uuid = UUID.randomUUID();
- final QueryChange newChange = QueryChange.create(uuid, sparql);
+ final QueryChange newChange = QueryChange.create(uuid, sparql, true);
changeLog.write(newChange);
changes.add(newChange);
}