You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rya.apache.org by ca...@apache.org on 2018/01/09 21:48:33 UTC
[16/50] [abbrv] incubator-rya git commit: RYA-377 Add ListQueries
command
RYA-377 Add ListQueries command
Added command for listing queries
Added Integration Test for the command
Added default implementation for the interacot
Added default usage to command interface
Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/adc44fd3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/adc44fd3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/adc44fd3
Branch: refs/heads/master
Commit: adc44fd369b4f54497fa7a4d1ef099d166effbf3
Parents: a8b511b
Author: Andrew Smith <sm...@gmail.com>
Authored: Fri Oct 27 17:08:47 2017 -0400
Committer: caleb <ca...@parsons.com>
Committed: Tue Jan 9 15:13:00 2018 -0500
----------------------------------------------------------------------
.../rya/streams/api/interactor/ListQueries.java | 4 +-
.../interactor/defaults/DefaultListQueries.java | 54 ++++++++
.../rya/streams/client/RyaStreamsCommand.java | 9 +-
.../client/command/ListQueriesCommand.java | 124 +++++++++++++++++++
.../client/command/ListQueryCommandIT.java | 60 +++++++++
5 files changed, 248 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/adc44fd3/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/ListQueries.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/ListQueries.java b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/ListQueries.java
index 4cab856..5d03f5c 100644
--- a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/ListQueries.java
+++ b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/ListQueries.java
@@ -18,7 +18,7 @@
*/
package org.apache.rya.streams.api.interactor;
-import java.util.List;
+import java.util.Set;
import org.apache.rya.streams.api.entity.StreamsQuery;
import org.apache.rya.streams.api.exception.RyaStreamsException;
@@ -38,5 +38,5 @@ public interface ListQueries {
* @return All of the queries that are managed.
* @throws RyaStreamsException The queries could not be listed.
*/
- public List<StreamsQuery> all() throws RyaStreamsException;
+ public Set<StreamsQuery> all() throws RyaStreamsException;
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/adc44fd3/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/defaults/DefaultListQueries.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/defaults/DefaultListQueries.java b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/defaults/DefaultListQueries.java
new file mode 100644
index 0000000..82ca691
--- /dev/null
+++ b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/defaults/DefaultListQueries.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.api.interactor.defaults;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Set;
+
+import org.apache.rya.streams.api.entity.StreamsQuery;
+import org.apache.rya.streams.api.exception.RyaStreamsException;
+import org.apache.rya.streams.api.interactor.ListQueries;
+import org.apache.rya.streams.api.queries.QueryRepository;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * Lists all queries in Rya Streams.
+ */
+@DefaultAnnotation(NonNull.class)
+public class DefaultListQueries implements ListQueries {
+ private final QueryRepository repository;
+
+ /**
+ * Creates a new {@link DefaultAddQuery}.
+ *
+ * @param repository - The {@link QueryRepository} to add a query to. (not
+ * null)
+ */
+ public DefaultListQueries(final QueryRepository repository) {
+ this.repository = requireNonNull(repository);
+ }
+
+ @Override
+ public Set<StreamsQuery> all() throws RyaStreamsException {
+ return repository.list();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/adc44fd3/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/RyaStreamsCommand.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/RyaStreamsCommand.java b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/RyaStreamsCommand.java
index c4c55e8..967b79e 100644
--- a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/RyaStreamsCommand.java
+++ b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/RyaStreamsCommand.java
@@ -18,6 +18,7 @@
*/
package org.apache.rya.streams.client;
+import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import com.google.common.base.Strings;
@@ -80,7 +81,13 @@ public interface RyaStreamsCommand {
/**
* @return Describes what arguments may be provided to the command.
*/
- public String getUsage();
+ default public String getUsage() {
+ final JCommander parser = new JCommander(new Parameters());
+
+ final StringBuilder usage = new StringBuilder();
+ parser.usage(usage);
+ return usage.toString();
+ }
/**
* Execute the command using the command line arguments.
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/adc44fd3/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/ListQueriesCommand.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/ListQueriesCommand.java b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/ListQueriesCommand.java
new file mode 100644
index 0000000..ec40b50
--- /dev/null
+++ b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/ListQueriesCommand.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.client.command;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Properties;
+import java.util.Set;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.rya.streams.api.entity.StreamsQuery;
+import org.apache.rya.streams.api.exception.RyaStreamsException;
+import org.apache.rya.streams.api.interactor.ListQueries;
+import org.apache.rya.streams.api.interactor.defaults.DefaultListQueries;
+import org.apache.rya.streams.api.queries.InMemoryQueryRepository;
+import org.apache.rya.streams.api.queries.QueryChange;
+import org.apache.rya.streams.api.queries.QueryChangeLog;
+import org.apache.rya.streams.api.queries.QueryRepository;
+import org.apache.rya.streams.client.RyaStreamsCommand;
+import org.apache.rya.streams.kafka.queries.KafkaQueryChangeLog;
+import org.apache.rya.streams.kafka.serialization.queries.QueryChangeDeserializer;
+import org.apache.rya.streams.kafka.serialization.queries.QueryChangeSerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.ParameterException;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * A command that lists all queries currently in Rya Streams.
+ */
+@DefaultAnnotation(NonNull.class)
+public class ListQueriesCommand implements RyaStreamsCommand {
+ private static final Logger log = LoggerFactory.getLogger(ListQueriesCommand.class);
+
+ @Override
+ public String getCommand() {
+ return "list-queries";
+ }
+
+ @Override
+ public String getDescription() {
+ return "Lists all queries currently in Rya Streams.";
+ }
+
+ @Override
+ public void execute(final String[] args) throws ArgumentsException, ExecutionException {
+ requireNonNull(args);
+
+ // Parse the command line arguments.
+ final Parameters params = new Parameters();
+ try {
+ new JCommander(params, args);
+ } catch (final ParameterException e) {
+ throw new ArgumentsException("Could not list the queries because of invalid command line parameters.", e);
+ }
+ log.trace("Executing the List Query Command.\n" + params.toString());
+
+ // Create properties for interacting with Kafka.
+ final Properties producerProperties = new Properties();
+ producerProperties.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, params.kafkaIP + ":" + params.kafkaPort);
+ producerProperties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+ producerProperties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, QueryChangeSerializer.class.getName());
+ final Properties consumerProperties = new Properties();
+ consumerProperties.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, params.kafkaIP + ":" + params.kafkaPort);
+ consumerProperties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+ consumerProperties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, QueryChangeDeserializer.class.getName());
+ final Producer<?, QueryChange> queryProducer = new KafkaProducer<>(producerProperties);
+ final Consumer<?, QueryChange> queryConsumer = new KafkaConsumer<>(consumerProperties);
+
+ final QueryChangeLog changeLog = new KafkaQueryChangeLog(queryProducer, queryConsumer, params.topicName);
+ final QueryRepository repo = new InMemoryQueryRepository(changeLog);
+ final ListQueries listQueries = new DefaultListQueries(repo);
+ try {
+ final Set<StreamsQuery> queries = listQueries.all();
+ logQueries(queries);
+ } catch (final RyaStreamsException e) {
+ log.error("Unable to retrieve the queries.", e);
+ }
+ }
+
+ private void logQueries(final Set<StreamsQuery> queries) {
+ final StringBuilder sb = new StringBuilder();
+ sb.append("\n");
+ sb.append("Queries in Rya Streams:\n");
+ sb.append("---------------------------------------------------------\n");
+ queries.forEach(query -> {
+ sb.append("ID: ");
+ sb.append(query.getQueryId());
+ sb.append("\t\t");
+ sb.append("Query: ");
+ sb.append(query.getSparql());
+ sb.append("\n");
+ });
+ log.trace(sb.toString());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/adc44fd3/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/ListQueryCommandIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/ListQueryCommandIT.java b/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/ListQueryCommandIT.java
new file mode 100644
index 0000000..be90c5f
--- /dev/null
+++ b/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/ListQueryCommandIT.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.client.command;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.Properties;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.rya.test.kafka.KafkaITBase;
+import org.apache.rya.test.kafka.KafkaTestInstanceRule;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+/**
+ * integration Test for listing queries through a command.
+ */
+public class ListQueryCommandIT extends KafkaITBase {
+ private String[] args;
+
+ @Rule
+ public KafkaTestInstanceRule rule = new KafkaTestInstanceRule(true);
+
+ @Before
+ public void setup() {
+ final Properties props = rule.createBootstrapServerConfig();
+ final String location = props.getProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG);
+ final String[] tokens = location.split(":");
+ args = new String[] {
+ "-t", rule.getKafkaTopicName(),
+ "-p", tokens[1],
+ "-i", tokens[0]
+ };
+ }
+
+ @Test
+ public void happyListQueriesTest() throws Exception {
+ final ListQueriesCommand command = new ListQueriesCommand();
+ command.execute(args);
+ // not sure what to assert here.
+ assertEquals(true, true);
+ }
+}