You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rya.apache.org by pu...@apache.org on 2018/03/09 18:21:07 UTC

[21/22] incubator-rya git commit: RYA-463 RYA-464 Added an isInsert flag to StreamsQuery to indicate when the results of a query need to be inserted back into Rya.

RYA-463 RYA-464 Added an isInsert flag to StreamsQuery to indicate when the results of a query need to be inserted back into Rya.

Conflicts:
	common/rya.api/src/main/java/org/apache/rya/api/utils/QueryInvestigator.java
	common/rya.api/src/test/java/org/apache/rya/api/utils/QueryInvestigatorTest.java
	extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/entity/StreamsQuery.java
	extras/rya.streams/api/src/test/java/org/apache/rya/streams/api/queries/InMemoryQueryRepositoryTest.java
	extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/ListQueriesCommand.java


Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/010c6927
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/010c6927
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/010c6927

Branch: refs/heads/master
Commit: 010c6927647efccc9b19485ef0a931ea20846d8a
Parents: e07390d
Author: kchilton2 <ke...@gmail.com>
Authored: Thu Feb 8 17:53:41 2018 -0500
Committer: Valiyil <Pu...@parsons.com>
Committed: Fri Mar 9 13:00:05 2018 -0500

----------------------------------------------------------------------
 .../apache/rya/api/utils/QueryInvestigator.java |   2 +-
 .../rya/api/utils/QueryInvestigatorTest.java    |   2 +-
 .../rya/streams/api/entity/StreamsQuery.java    |  40 ++--
 .../rya/streams/api/interactor/AddQuery.java    |   4 +-
 .../interactor/defaults/DefaultAddQuery.java    |   4 +-
 .../api/queries/InMemoryQueryRepository.java    |  10 +-
 .../rya/streams/api/queries/QueryChange.java    |  35 +++-
 .../streams/api/queries/QueryRepository.java    |   6 +-
 .../defaults/DefaultAddQueryTest.java           |   6 +-
 .../queries/InMemoryQueryRepositoryTest.java    |  56 +++---
 .../streams/client/command/AddQueryCommand.java |  38 +++-
 .../client/command/ListQueriesCommand.java      |   7 +-
 .../client/command/AddQueryCommandIT.java       |  25 ++-
 .../client/command/DeleteQueryCommandIT.java    |  12 +-
 .../client/command/ListQueryCommandIT.java      |  12 +-
 .../client/command/RunQueryCommandIT.java       |   2 +-
 .../kafka/interactor/KafkaRunQueryIT.java       |   2 +-
 .../kafka/queries/KafkaQueryChangeLogIT.java    |   8 +-
 .../querymanager/LogEventWorkerTest.java        |  10 +-
 .../QueryEventWorkGeneratorTest.java            |  18 +-
 .../querymanager/QueryEventWorkerTest.java      |   2 +-
 .../streams/querymanager/QueryManagerTest.java  |  12 +-
 .../kafka/LocalQueryExecutorIT.java             |   2 +-
 .../kafka/LocalQueryExecutorTest.java           |  26 +--
 .../apache/rya/shell/RyaStreamsCommands.java    |  40 +++-
 .../rya/shell/util/StreamsQueryFormatter.java   |   1 +
 .../rya/shell/RyaStreamsCommandsTest.java       | 181 ++++++++++++++++---
 .../shell/util/StreamsQueryFormatterTest.java   |  12 +-
 28 files changed, 409 insertions(+), 166 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/010c6927/common/rya.api/src/main/java/org/apache/rya/api/utils/QueryInvestigator.java
----------------------------------------------------------------------
diff --git a/common/rya.api/src/main/java/org/apache/rya/api/utils/QueryInvestigator.java b/common/rya.api/src/main/java/org/apache/rya/api/utils/QueryInvestigator.java
index 2fbd09b..a6c93e9 100644
--- a/common/rya.api/src/main/java/org/apache/rya/api/utils/QueryInvestigator.java
+++ b/common/rya.api/src/main/java/org/apache/rya/api/utils/QueryInvestigator.java
@@ -100,4 +100,4 @@ public class QueryInvestigator {
             }
         }
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/010c6927/common/rya.api/src/test/java/org/apache/rya/api/utils/QueryInvestigatorTest.java
----------------------------------------------------------------------
diff --git a/common/rya.api/src/test/java/org/apache/rya/api/utils/QueryInvestigatorTest.java b/common/rya.api/src/test/java/org/apache/rya/api/utils/QueryInvestigatorTest.java
index fbe2ebe..bedc59a 100644
--- a/common/rya.api/src/test/java/org/apache/rya/api/utils/QueryInvestigatorTest.java
+++ b/common/rya.api/src/test/java/org/apache/rya/api/utils/QueryInvestigatorTest.java
@@ -147,4 +147,4 @@ public class QueryInvestigatorTest {
     public void isInsert_false_malformed() throws MalformedQueryException {
         assertFalse( QueryInvestigator.isInsertWhere("not sparql") );
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/010c6927/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/entity/StreamsQuery.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/entity/StreamsQuery.java b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/entity/StreamsQuery.java
index 11423bd..76cf6af 100644
--- a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/entity/StreamsQuery.java
+++ b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/entity/StreamsQuery.java
@@ -35,6 +35,7 @@ public class StreamsQuery {
     private final UUID queryId;
     private final String sparql;
     private final boolean isActive;
+    private final boolean isInsert;
 
     /**
      * Constructs an instance of {@link StreamsQuery}.
@@ -42,11 +43,18 @@ public class StreamsQuery {
      * @param queryId - Uniquely identifies the query within Rya Streams. (not null)
      * @param sparql - The SPARQL query that defines how statements will be processed. (not null)
      * @param isActive - {@code true} if Rya Streams should process this query; otherwise {@code false}.
+     * @param isInsert - {@code true} if Rya Streams should insert the results of the query back into
+     *   the Rya instance the statements originated from; otherwise {@code false}.
      */
-    public StreamsQuery(final UUID queryId, final String sparql, final boolean isActive) {
+    public StreamsQuery(
+            final UUID queryId,
+            final String sparql,
+            final boolean isActive,
+            final boolean isInsert) {
         this.queryId = requireNonNull(queryId);
         this.sparql = requireNonNull(sparql);
         this.isActive = isActive;
+        this.isInsert = isInsert;
     }
 
     /**
@@ -70,9 +78,17 @@ public class StreamsQuery {
         return isActive;
     }
 
+    /**
+     * @return {@code true} if Rya Streams should insert the results of the query back into
+     *   the Rya instance the statements originated from; otherwise {@code false}.
+     */
+    public boolean isInsert() {
+        return isInsert;
+    }
+
     @Override
     public int hashCode() {
-        return Objects.hash(queryId, sparql, isActive);
+        return Objects.hash(queryId, sparql, isActive, isInsert);
     }
 
     @Override
@@ -81,23 +97,17 @@ public class StreamsQuery {
             final StreamsQuery other = (StreamsQuery) o;
             return Objects.equals(queryId, other.queryId) &&
                     Objects.equals(sparql, other.sparql) &&
-                    isActive == other.isActive;
+                    isActive == other.isActive &&
+                    isInsert == other.isInsert;
         }
         return false;
     }
 
     @Override
     public String toString() {
-        final StringBuilder sb = new StringBuilder();
-        sb.append("ID: ");
-        sb.append(getQueryId().toString() + "\n");
-        sb.append("Query: ");
-        sb.append(getSparql() + "\n");
-        sb.append("Is ");
-        if (!isActive) {
-            sb.append("Not ");
-        }
-        sb.append("Running.\n");
-        return sb.toString();
+        return "ID: " + queryId + "\n" +
+                "Query: " + sparql + "\n" +
+                "Is Active: " + isActive + "\n" +
+                "Is Insert: " + isInsert + "\n";
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/010c6927/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/AddQuery.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/AddQuery.java b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/AddQuery.java
index 9889fd0..842399f 100644
--- a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/AddQuery.java
+++ b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/AddQuery.java
@@ -36,8 +36,10 @@ public interface AddQuery {
      * @param query - The SPARQL query that will be added. (not null)
      * @param isActive - {@code true} if the query needs to be maintained by
      *   Rya Streams; otherwise {@code false}.
+     * @param isInsert - {@code true} if the query's reuslts need to be inserted into
+     *   the Rya instance that originated the statements; otherwise {@code false}.
      * @return The {@link StreamsQuery} used by Rya Streams for this query.
      * @throws RyaStreamsException The query could not be added to Rya Streams.
      */
-    public StreamsQuery addQuery(final String query, boolean isActive) throws RyaStreamsException;
+    public StreamsQuery addQuery(final String query, boolean isActive, boolean isInsert) throws RyaStreamsException;
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/010c6927/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/defaults/DefaultAddQuery.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/defaults/DefaultAddQuery.java b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/defaults/DefaultAddQuery.java
index edd90fd..67edec0 100644
--- a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/defaults/DefaultAddQuery.java
+++ b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/defaults/DefaultAddQuery.java
@@ -49,7 +49,7 @@ public class DefaultAddQuery implements AddQuery {
     }
 
     @Override
-    public StreamsQuery addQuery(final String query, final boolean isActive) throws RyaStreamsException {
+    public StreamsQuery addQuery(final String query, final boolean isActive, final boolean isInsert) throws RyaStreamsException {
         requireNonNull(query);
 
         // Make sure the SPARQL is valid.
@@ -60,6 +60,6 @@ public class DefaultAddQuery implements AddQuery {
         }
 
         // If it is, then store it in the repository.
-        return repository.add(query, isActive);
+        return repository.add(query, isActive, isInsert);
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/010c6927/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/InMemoryQueryRepository.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/InMemoryQueryRepository.java b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/InMemoryQueryRepository.java
index 95c1922..c71f0f8 100644
--- a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/InMemoryQueryRepository.java
+++ b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/InMemoryQueryRepository.java
@@ -92,7 +92,7 @@ public class InMemoryQueryRepository extends AbstractScheduledService implements
     }
 
     @Override
-    public StreamsQuery add(final String query, final boolean isActive)
+    public StreamsQuery add(final String query, final boolean isActive, final boolean isInsert)
             throws QueryRepositoryException, IllegalStateException {
         requireNonNull(query);
 
@@ -101,7 +101,7 @@ public class InMemoryQueryRepository extends AbstractScheduledService implements
             checkState();
             // First record the change to the log.
             final UUID queryId = UUID.randomUUID();
-            final QueryChange change = QueryChange.create(queryId, query, isActive);
+            final QueryChange change = QueryChange.create(queryId, query, isActive, isInsert);
             changeLog.write(change);
 
             // Update the cache to represent what is currently in the log.
@@ -235,7 +235,8 @@ public class InMemoryQueryRepository extends AbstractScheduledService implements
                         final StreamsQuery query = new StreamsQuery(
                                 queryId,
                                 change.getSparql().get(),
-                                change.getIsActive().get());
+                                change.getIsActive().get(),
+                                change.getIsInsert().get());
                         queriesCache.put(queryId, query);
                         break;
 
@@ -245,7 +246,8 @@ public class InMemoryQueryRepository extends AbstractScheduledService implements
                             final StreamsQuery updated = new StreamsQuery(
                                     old.getQueryId(),
                                     old.getSparql(),
-                                    change.getIsActive().get());
+                                    change.getIsActive().get(),
+                                    old.isInsert());
                             queriesCache.put(queryId, updated);
                         }
                         break;

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/010c6927/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryChange.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryChange.java b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryChange.java
index d34a394..fb58844 100644
--- a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryChange.java
+++ b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryChange.java
@@ -42,6 +42,7 @@ public final class QueryChange implements Serializable {
     private final ChangeType changeType;
     private final Optional<String> sparql;
     private final Optional<Boolean> isActive;
+    private final Optional<Boolean> isInsert;
 
     /**
      * Constructs an instance of {@link QueryChange}. Use the {@link #create(UUID, String)} or {@link #delete(UUID)}
@@ -52,16 +53,20 @@ public final class QueryChange implements Serializable {
      * @param sparql - If this is a create change, then the SPARQL query that will be evaluated within Rya Streams. (not null)
      * @param isActive - If this is a create or update change, then the active state that defines if the
      *   query will be evaluated by RyaStreams. (not null)
+     * @param isInsert - If this is a create change, then the insert state that defines if the
+     *   results of the query will be inserted back into the originating Rya store. (not null)
      */
     private QueryChange(
             final UUID queryId,
             final ChangeType changeType,
             final Optional<String> sparql,
-            final Optional<Boolean> isActive) {
+            final Optional<Boolean> isActive,
+            final Optional<Boolean> isInsert) {
         this.queryId = requireNonNull(queryId);
         this.changeType = requireNonNull(changeType);
         this.sparql = requireNonNull(sparql);
         this.isActive = requireNonNull(isActive);
+        this.isInsert = requireNonNull(isInsert);
     }
 
     /**
@@ -93,9 +98,17 @@ public final class QueryChange implements Serializable {
         return isActive;
     }
 
+    /**
+     * @return If this is a create change, then the insert state that defines if the
+     *   results of the query will be inserted back into the originating Rya store.
+     */
+    public Optional<Boolean> getIsInsert() {
+        return isInsert;
+    }
+
     @Override
     public int hashCode() {
-        return Objects.hash(queryId, changeType, sparql, isActive);
+        return Objects.hash(queryId, changeType, sparql, isActive, isInsert);
     }
 
     @Override
@@ -105,7 +118,8 @@ public final class QueryChange implements Serializable {
             return Objects.equals(queryId, change.queryId) &&
                     Objects.equals(changeType, change.changeType) &&
                     Objects.equals(sparql, change.sparql) &&
-                    Objects.equals(isActive, change.isActive);
+                    Objects.equals(isActive, change.isActive) &&
+                    Objects.equals(isInsert, change.isInsert);
         }
         return false;
     }
@@ -116,6 +130,7 @@ public final class QueryChange implements Serializable {
                "    Query ID: " + queryId + ",\n" +
                "    Change Type: " + changeType + ",\n" +
                "    Is Active: " + isActive + ",\n" +
+               "    Is Insert: " + isInsert + ",\n" +
                "    SPARQL: " + sparql + "\n" +
                "}";
     }
@@ -125,22 +140,24 @@ public final class QueryChange implements Serializable {
      *
      * @param queryId - Uniquely identifies the query within the streaming system. (not null)
      * @param sparql - The query that will be evaluated. (not null)
-     * @param isActive - The active state that defines if the query will be evaluated by RyaStreams. (not null)
+     * @param isActive - The active state that defines if the query will be evaluated by RyaStreams.
+     * @param isInsert - The insert state that defines if the query's results will be inserted back
+     *   into the Rya instance the originating statements came from.
      * @return A {@link QueryChange} built using the provided values.
      */
-    public static QueryChange create(final UUID queryId, final String sparql, final boolean isActive) {
-        return new QueryChange(queryId, ChangeType.CREATE, Optional.of(sparql), Optional.of(isActive));
+    public static QueryChange create(final UUID queryId, final String sparql, final boolean isActive, final boolean isInsert) {
+        return new QueryChange(queryId, ChangeType.CREATE, Optional.of(sparql), Optional.of(isActive), Optional.of(isInsert));
     }
 
     /**
      * Create a {@link QueryChange} that represents a query in Rya Streams whose active state has changed.
      *
      * @param queryId - Uniquely identifies the query within the streaming system. (not null)
-     * @param isActive - The active state that defines if the query will be evaluated by RyaStreams. (not null)
+     * @param isActive - The active state that defines if the query will be evaluated by RyaStreams.
      * @return A {@link QueryChange} built using the provided values.
      */
     public static QueryChange update(final UUID queryId, final boolean isActive) {
-        return new QueryChange(queryId, ChangeType.UPDATE, Optional.absent(), Optional.of(isActive));
+        return new QueryChange(queryId, ChangeType.UPDATE, Optional.absent(), Optional.of(isActive), Optional.absent());
     }
 
     /**
@@ -150,7 +167,7 @@ public final class QueryChange implements Serializable {
      * @return A {@link QueryChange} built using the provided values.
      */
     public static QueryChange delete(final UUID queryId) {
-        return new QueryChange(queryId, ChangeType.DELETE, Optional.absent(), Optional.absent());
+        return new QueryChange(queryId, ChangeType.DELETE, Optional.absent(), Optional.absent(), Optional.absent());
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/010c6927/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryRepository.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryRepository.java b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryRepository.java
index 4d8b2db..e4bcd7f 100644
--- a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryRepository.java
+++ b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryRepository.java
@@ -1,4 +1,4 @@
-/*
+/**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -46,11 +46,13 @@ public interface QueryRepository extends Service {
      * @param query - The SPARQL query to add. (not null)
      * @param isActive - {@code true} if the query should be processed after it is added
      *   otherwise {@code false}.
+     * @param isInsert - {@code true} if the query's results should be inserted back into
+     *   the Rya instance the originating statements came from; otherwise {@code false}.
      * @return The {@link StreamsQuery} used in Rya Streams.
      * @throws QueryRepositoryException Could not add the query.
      * @throws IllegalStateException The Service has not been started, but has been subscribed to.
      */
-    public StreamsQuery add(final String query, boolean isActive) throws QueryRepositoryException, IllegalStateException;
+    public StreamsQuery add(final String query, boolean isActive, boolean isInsert) throws QueryRepositoryException, IllegalStateException;
 
     /**
      * Updates the isActive state of a {@link StreamsQuery}. Setting this value to {@code true}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/010c6927/extras/rya.streams/api/src/test/java/org/apache/rya/streams/api/interactor/defaults/DefaultAddQueryTest.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/api/src/test/java/org/apache/rya/streams/api/interactor/defaults/DefaultAddQueryTest.java b/extras/rya.streams/api/src/test/java/org/apache/rya/streams/api/interactor/defaults/DefaultAddQueryTest.java
index 77a0a15..1a6ea88 100644
--- a/extras/rya.streams/api/src/test/java/org/apache/rya/streams/api/interactor/defaults/DefaultAddQueryTest.java
+++ b/extras/rya.streams/api/src/test/java/org/apache/rya/streams/api/interactor/defaults/DefaultAddQueryTest.java
@@ -43,10 +43,10 @@ public class DefaultAddQueryTest {
         final AddQuery addQuery = new DefaultAddQuery(repo);
 
         // Add the query.
-        addQuery.addQuery(sparql, true);
+        addQuery.addQuery(sparql, true, true);
 
         // Verify the call was forwarded to the repository.
-        verify(repo, times(1)).add(eq(sparql), eq(true));
+        verify(repo, times(1)).add(eq(sparql), eq(true), eq(true));
     }
 
     @Test(expected = RyaStreamsException.class)
@@ -59,6 +59,6 @@ public class DefaultAddQueryTest {
         final AddQuery addQuery = new DefaultAddQuery(repo);
 
         // Add the query.
-        addQuery.addQuery(sparql, true);
+        addQuery.addQuery(sparql, true, true);
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/010c6927/extras/rya.streams/api/src/test/java/org/apache/rya/streams/api/queries/InMemoryQueryRepositoryTest.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/api/src/test/java/org/apache/rya/streams/api/queries/InMemoryQueryRepositoryTest.java b/extras/rya.streams/api/src/test/java/org/apache/rya/streams/api/queries/InMemoryQueryRepositoryTest.java
index d7e116b..5a16e79 100644
--- a/extras/rya.streams/api/src/test/java/org/apache/rya/streams/api/queries/InMemoryQueryRepositoryTest.java
+++ b/extras/rya.streams/api/src/test/java/org/apache/rya/streams/api/queries/InMemoryQueryRepositoryTest.java
@@ -50,9 +50,9 @@ public class InMemoryQueryRepositoryTest {
         final QueryRepository queries = new InMemoryQueryRepository( new InMemoryQueryChangeLog(), SCHEDULE );
         // Add some queries to it.
         final Set<StreamsQuery> expected = new HashSet<>();
-        expected.add( queries.add("query 1", true) );
-        expected.add( queries.add("query 2", false) );
-        expected.add( queries.add("query 3", true) );
+        expected.add( queries.add("query 1", true, true) );
+        expected.add( queries.add("query 2", false, true) );
+        expected.add( queries.add("query 3", true, false) );
 
         // Show they are in the list of all queries.
         final Set<StreamsQuery> stored = queries.list();
@@ -65,9 +65,9 @@ public class InMemoryQueryRepositoryTest {
         final QueryRepository queries = new InMemoryQueryRepository( new InMemoryQueryChangeLog(), SCHEDULE );
         // Add some queries to it. The second one we will delete.
         final Set<StreamsQuery> expected = new HashSet<>();
-        expected.add( queries.add("query 1", true) );
-        final UUID deletedMeId = queries.add("query 2", false).getQueryId();
-        expected.add( queries.add("query 3", true) );
+        expected.add( queries.add("query 1", true, true) );
+        final UUID deletedMeId = queries.add("query 2", false, true).getQueryId();
+        expected.add( queries.add("query 3", true, false) );
 
         // Delete the second query.
         queries.delete( deletedMeId );
@@ -86,16 +86,20 @@ public class InMemoryQueryRepositoryTest {
             queries.startAndWait();
             // Add some queries and deletes to it.
             final Set<StreamsQuery> expected = new HashSet<>();
-            expected.add( queries.add("query 1", true) );
-            final UUID deletedMeId = queries.add("query 2", false).getQueryId();
-            expected.add( queries.add("query 3", true) );
+            expected.add( queries.add("query 1", true, true) );
+            final UUID deletedMeId = queries.add("query 2", false, true).getQueryId();
+            expected.add( queries.add("query 3", true, false) );
             queries.delete( deletedMeId );
 
             // Create a new totally in memory QueryRepository.
             final QueryRepository initializedQueries = new InMemoryQueryRepository( changeLog, SCHEDULE );
-            // Listing the queries should work using an initialized change log.
-            final Set<StreamsQuery> stored = initializedQueries.list();
-            assertEquals(expected, stored);
+            try {
+                // Listing the queries should work using an initialized change log.
+                final Set<StreamsQuery> stored = initializedQueries.list();
+                assertEquals(expected, stored);
+            } finally {
+                queries.stop();
+            }
         } finally {
             queries.stop();
         }
@@ -117,7 +121,7 @@ public class InMemoryQueryRepositoryTest {
         // Setup a totally in memory QueryRepository.
         final QueryRepository queries = new InMemoryQueryRepository( new InMemoryQueryChangeLog(), SCHEDULE );
         // Add a query to it.
-        final StreamsQuery query = queries.add("query 1", true);
+        final StreamsQuery query = queries.add("query 1", true, false);
 
         // Show the fetched query matches the expected ones.
         final Optional<StreamsQuery> fetched = queries.get(query.getQueryId());
@@ -140,14 +144,14 @@ public class InMemoryQueryRepositoryTest {
         // Setup a totally in memory QueryRepository.
         final QueryRepository queries = new InMemoryQueryRepository( new InMemoryQueryChangeLog(), SCHEDULE );
         // Add a query to it.
-        final StreamsQuery query = queries.add("query 1", true);
+        final StreamsQuery query = queries.add("query 1", true, false);
 
         // Change the isActive state of that query.
         queries.updateIsActive(query.getQueryId(), false);
 
         // Show the fetched query matches the expected one.
         final Optional<StreamsQuery> fetched = queries.get(query.getQueryId());
-        final StreamsQuery expected = new StreamsQuery(query.getQueryId(), query.getSparql(), false);
+        final StreamsQuery expected = new StreamsQuery(query.getQueryId(), query.getSparql(), false, false);
         assertEquals(expected, fetched.get());
     }
 
@@ -159,13 +163,13 @@ public class InMemoryQueryRepositoryTest {
             queries.startAndWait();
 
             // Add a query to it.
-            final StreamsQuery query = queries.add("query 1", true);
+            final StreamsQuery query = queries.add("query 1", true, false);
 
             final Set<StreamsQuery> existing = queries.subscribe((queryChangeEvent, newQueryState) -> {
                 final ChangeLogEntry<QueryChange> expected = new ChangeLogEntry<>(1L,
-                        QueryChange.create(queryChangeEvent.getEntry().getQueryId(), "query 2", true));
+                        QueryChange.create(queryChangeEvent.getEntry().getQueryId(), "query 2", true, false));
                 final Optional<StreamsQuery> expectedQueryState = Optional.of(
-                        new StreamsQuery(queryChangeEvent.getEntry().getQueryId(), "query 2", true));
+                        new StreamsQuery(queryChangeEvent.getEntry().getQueryId(), "query 2", true, false));
 
                 assertEquals(expected, queryChangeEvent);
                 assertEquals(expectedQueryState, newQueryState);
@@ -173,7 +177,7 @@ public class InMemoryQueryRepositoryTest {
 
             assertEquals(Sets.newHashSet(query), existing);
 
-            queries.add("query 2", true);
+            queries.add("query 2", true, false);
         } finally {
             queries.stop();
         }
@@ -194,9 +198,9 @@ public class InMemoryQueryRepositoryTest {
             final CountDownLatch repo1Latch = new CountDownLatch(1);
             queries.subscribe((queryChangeEvent, newQueryState) -> {
                 final ChangeLogEntry<QueryChange> expected = new ChangeLogEntry<>(0L,
-                        QueryChange.create(queryChangeEvent.getEntry().getQueryId(), "query 2", true));
+                        QueryChange.create(queryChangeEvent.getEntry().getQueryId(), "query 2", true, false));
                 final Optional<StreamsQuery> expectedQueryState = Optional.of(
-                        new StreamsQuery(queryChangeEvent.getEntry().getQueryId(), "query 2", true));
+                        new StreamsQuery(queryChangeEvent.getEntry().getQueryId(), "query 2", true, false));
 
                 assertEquals(expected, queryChangeEvent);
                 assertEquals(expectedQueryState, newQueryState);
@@ -207,16 +211,16 @@ public class InMemoryQueryRepositoryTest {
             final CountDownLatch repo2Latch = new CountDownLatch(1);
             queries2.subscribe((queryChangeEvent, newQueryState) -> {
                 final ChangeLogEntry<QueryChange> expected = new ChangeLogEntry<>(0L,
-                        QueryChange.create(queryChangeEvent.getEntry().getQueryId(), "query 2", true));
+                        QueryChange.create(queryChangeEvent.getEntry().getQueryId(), "query 2", true, false));
                 final Optional<StreamsQuery> expectedQueryState = Optional.of(
-                        new StreamsQuery(queryChangeEvent.getEntry().getQueryId(), "query 2", true));
+                        new StreamsQuery(queryChangeEvent.getEntry().getQueryId(), "query 2", true, false));
 
                 assertEquals(expected, queryChangeEvent);
                 assertEquals(expectedQueryState, newQueryState);
                 repo2Latch.countDown();
             });
 
-            queries.add("query 2", true);
+            queries.add("query 2", true, false);
 
             assertTrue(repo1Latch.await(5, TimeUnit.SECONDS));
             assertTrue(repo2Latch.await(5, TimeUnit.SECONDS));
@@ -233,6 +237,6 @@ public class InMemoryQueryRepositoryTest {
         final QueryRepository queries = new InMemoryQueryRepository(new InMemoryQueryChangeLog(), SCHEDULE);
         queries.subscribe((queryChangeEvent, newQueryState) -> {});
 
-        queries.add("query 2", true);
+        queries.add("query 2", true, false);
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/010c6927/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/AddQueryCommand.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/AddQueryCommand.java b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/AddQueryCommand.java
index 9273c33..3886a95 100644
--- a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/AddQueryCommand.java
+++ b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/AddQueryCommand.java
@@ -22,6 +22,7 @@ import static java.util.Objects.requireNonNull;
 
 import java.util.concurrent.TimeUnit;
 
+import org.apache.rya.api.utils.QueryInvestigator;
 import org.apache.rya.streams.api.entity.StreamsQuery;
 import org.apache.rya.streams.api.exception.RyaStreamsException;
 import org.apache.rya.streams.api.interactor.AddQuery;
@@ -32,6 +33,7 @@ import org.apache.rya.streams.api.queries.QueryRepository;
 import org.apache.rya.streams.client.RyaStreamsCommand;
 import org.apache.rya.streams.kafka.KafkaTopics;
 import org.apache.rya.streams.kafka.queries.KafkaQueryChangeLogFactory;
+import org.openrdf.query.MalformedQueryException;
 
 import com.beust.jcommander.JCommander;
 import com.beust.jcommander.Parameter;
@@ -58,6 +60,9 @@ public class AddQueryCommand implements RyaStreamsCommand {
         @Parameter(names = {"--isActive", "-a"}, required = false, description = "True if the added query will be started.")
         private String isActive;
 
+        @Parameter(names = {"--isInsert", "-n"}, required = false, description = "True if the reuslts of the query will be written back to Rya.")
+        private String isInsert;
+
         @Override
         public String toString() {
             final StringBuilder parameters = new StringBuilder();
@@ -67,6 +72,7 @@ public class AddQueryCommand implements RyaStreamsCommand {
                 parameters.append("\tQuery: " + query + "\n");
             }
             parameters.append("\tIs Active: " + isActive + "\n");
+            parameters.append("\tis Insert: " + isInsert + "\n");
             return parameters.toString();
         }
     }
@@ -125,17 +131,33 @@ public class AddQueryCommand implements RyaStreamsCommand {
         try {
             final AddQuery addQuery = new DefaultAddQuery(queryRepo);
             try {
-                final StreamsQuery query = addQuery.addQuery(params.query, Boolean.parseBoolean(params.isActive));
+                final Boolean isActive = Boolean.parseBoolean(params.isActive);
+                final Boolean isInsert = Boolean.parseBoolean(params.isInsert);
+
+                // If the query's results are meant to be written back to Rya, make sure it creates statements.
+                if(isInsert) {
+                    final boolean isConstructQuery = QueryInvestigator.isConstruct(params.query);
+                    final boolean isInsertQuery = QueryInvestigator.isInsertWhere(params.query);
+
+                    if(isConstructQuery) {
+                        System.out.println(
+                                "WARNING: CONSTRUCT is part of the SPARQL Query API, so they do not normally\n" +
+                                "get written back to the triple store. Consider using an INSERT, which is\n" +
+                                "part of the SPARQL Update API, in the future.");
+                    }
+
+                    if(!(isConstructQuery || isInsertQuery)) {
+                        throw new ArgumentsException("Only CONSTRUCT queries and INSERT updates may be inserted back to the triple store.");
+                    }
+                }
+
+                final StreamsQuery query = addQuery.addQuery(params.query, isActive, isInsert);
                 System.out.println("Added query: " + query.getSparql());
             } catch (final RyaStreamsException e) {
-                System.err.println("Unable to parse query: " + params.query);
-                e.printStackTrace();
-                System.exit(1);
+                throw new ExecutionException("Unable to add the query to Rya Streams.", e);
             }
-        } catch (final Exception e) {
-            System.err.println("Problem encountered while closing the QueryRepository.");
-            e.printStackTrace();
-            System.exit(1);
+        } catch(final MalformedQueryException e) {
+            throw new ArgumentsException("Could not parse the provided query.", e);
         }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/010c6927/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/ListQueriesCommand.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/ListQueriesCommand.java b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/ListQueriesCommand.java
index a5507a6..3639aa9 100644
--- a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/ListQueriesCommand.java
+++ b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/ListQueriesCommand.java
@@ -23,7 +23,6 @@ import static java.util.Objects.requireNonNull;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.commons.lang.StringUtils;
 import org.apache.rya.streams.api.entity.StreamsQuery;
 import org.apache.rya.streams.api.exception.RyaStreamsException;
 import org.apache.rya.streams.api.interactor.ListQueries;
@@ -116,9 +115,11 @@ public class ListQueriesCommand implements RyaStreamsCommand {
             sb.append("ID: ").append(query.getQueryId())
                 .append("    ")
                 .append("Is Active: ").append(query.isActive())
-                .append(StringUtils.rightPad("" + query.isActive(), 9))
+                .append( query.isActive() ? "     " : "    " )
+                .append("Is Insert: ").append(query.isInsert())
+                .append(query.isInsert() ? "     " : "    ")
                 .append("Query: ").append(query.getSparql()).append("\n");
         });
         return sb.toString();
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/010c6927/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/AddQueryCommandIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/AddQueryCommandIT.java b/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/AddQueryCommandIT.java
index 3bfbadc..bbbcb2a 100644
--- a/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/AddQueryCommandIT.java
+++ b/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/AddQueryCommandIT.java
@@ -33,6 +33,7 @@ import org.apache.rya.streams.api.queries.InMemoryQueryRepository;
 import org.apache.rya.streams.api.queries.QueryChange;
 import org.apache.rya.streams.api.queries.QueryChangeLog;
 import org.apache.rya.streams.api.queries.QueryRepository;
+import org.apache.rya.streams.client.RyaStreamsCommand.ArgumentsException;
 import org.apache.rya.streams.kafka.KafkaTopics;
 import org.apache.rya.streams.kafka.queries.KafkaQueryChangeLog;
 import org.apache.rya.streams.kafka.serialization.queries.QueryChangeDeserializer;
@@ -78,7 +79,8 @@ public class AddQueryCommandIT {
                 "-i", kafka.getKafkaHostname(),
                 "-p", kafka.getKafkaPort(),
                 "-q", query,
-                "-a", "true"
+                "-a", "true",
+                "-n", "false"
         };
 
         // Execute the command.
@@ -100,7 +102,8 @@ public class AddQueryCommandIT {
                 "--kafkaHostname", kafka.getKafkaHostname(),
                 "--kafkaPort", kafka.getKafkaPort(),
                 "--query", query,
-                "--isActive", "true"
+                "--isActive", "true",
+                "--isInsert", "false"
         };
 
         // Execute the command.
@@ -112,4 +115,22 @@ public class AddQueryCommandIT {
         assertEquals(1, queries.size());
         assertEquals(query, queries.iterator().next().getSparql());
     }
+
+    @Test(expected = ArgumentsException.class)
+    public void canNotInsertQueries() throws Exception {
+        // Arguments that add a query to Rya Streams.
+        final String query = "SELECT * WHERE { ?person <urn:name> ?name }";
+        final String[] args = new String[] {
+                "--ryaInstance", "" + ryaInstance,
+                "--kafkaHostname", kafka.getKafkaHostname(),
+                "--kafkaPort", kafka.getKafkaPort(),
+                "--query", query,
+                "--isActive", "true",
+                "--isInsert", "true"
+        };
+
+        // Execute the command.
+        final AddQueryCommand command = new AddQueryCommand();
+        command.execute(args);
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/010c6927/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/DeleteQueryCommandIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/DeleteQueryCommandIT.java b/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/DeleteQueryCommandIT.java
index 7bec080..4cfdedb 100644
--- a/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/DeleteQueryCommandIT.java
+++ b/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/DeleteQueryCommandIT.java
@@ -74,9 +74,9 @@ public class DeleteQueryCommandIT {
     @Test
     public void shortParams() throws Exception {
         // Add a few queries to Rya Streams.
-        queryRepo.add("query1", true);
-        final UUID query2Id = queryRepo.add("query2", false).getQueryId();
-        queryRepo.add("query3", true);
+        queryRepo.add("query1", true, true);
+        final UUID query2Id = queryRepo.add("query2", false, true).getQueryId();
+        queryRepo.add("query3", true, false);
 
         // Show that all three of the queries were added.
         Set<StreamsQuery> queries = queryRepo.list();
@@ -105,9 +105,9 @@ public class DeleteQueryCommandIT {
     @Test
     public void longParams() throws Exception {
         // Add a few queries to Rya Streams.
-        queryRepo.add("query1", true);
-        final UUID query2Id = queryRepo.add("query2", false).getQueryId();
-        queryRepo.add("query3", true);
+        queryRepo.add("query1", true, true);
+        final UUID query2Id = queryRepo.add("query2", false, true).getQueryId();
+        queryRepo.add("query3", true, false);
 
         // Show that all three of the queries were added.
         Set<StreamsQuery> queries = queryRepo.list();

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/010c6927/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/ListQueryCommandIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/ListQueryCommandIT.java b/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/ListQueryCommandIT.java
index f6ceb75..e9f961a 100644
--- a/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/ListQueryCommandIT.java
+++ b/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/ListQueryCommandIT.java
@@ -68,9 +68,9 @@ public class ListQueryCommandIT {
     @Test
     public void shortParams() throws Exception {
         // Add a few queries to Rya Streams.
-        queryRepo.add("query1", true);
-        queryRepo.add("query2", false);
-        queryRepo.add("query3", true);
+        queryRepo.add("query1", true, true);
+        queryRepo.add("query2", false, true);
+        queryRepo.add("query3", true, false);
 
         // Execute the List Queries command.
         final String[] args = new String[] {
@@ -86,9 +86,9 @@ public class ListQueryCommandIT {
     @Test
     public void longParams() throws Exception {
         // Add a few queries to Rya Streams.
-        queryRepo.add("query1", true);
-        queryRepo.add("query2", false);
-        queryRepo.add("query3", true);
+        queryRepo.add("query1", true, true);
+        queryRepo.add("query2", false, true);
+        queryRepo.add("query3", true, false);
 
         // Execute the List Queries command.
         final String[] args = new String[] {

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/010c6927/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/RunQueryCommandIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/RunQueryCommandIT.java b/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/RunQueryCommandIT.java
index 176b920..21a8e4c 100644
--- a/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/RunQueryCommandIT.java
+++ b/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/RunQueryCommandIT.java
@@ -116,7 +116,7 @@ public class RunQueryCommandIT {
     @Test
     public void runQuery() throws Exception {
         // Register a query with the Query Repository.
-        final StreamsQuery sQuery = queryRepo.add("SELECT * WHERE { ?person <urn:worksAt> ?business . }", true);
+        final StreamsQuery sQuery = queryRepo.add("SELECT * WHERE { ?person <urn:worksAt> ?business . }", true, false);
 
         // Arguments that run the query we just registered with Rya Streams.
         final String[] args = new String[] {

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/010c6927/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaRunQueryIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaRunQueryIT.java b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaRunQueryIT.java
index c9abb41..4459057 100644
--- a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaRunQueryIT.java
+++ b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaRunQueryIT.java
@@ -88,7 +88,7 @@ public class KafkaRunQueryIT {
         final QueryRepository queries = new InMemoryQueryRepository( new InMemoryQueryChangeLog(), Scheduler.newFixedRateSchedule(0L, 5, TimeUnit.SECONDS) );
 
         // Add the query to the query repository.
-        final StreamsQuery sQuery = queries.add("SELECT * WHERE { ?person <urn:worksAt> ?business . }", true);
+        final StreamsQuery sQuery = queries.add("SELECT * WHERE { ?person <urn:worksAt> ?business . }", true, false);
         final UUID queryId = sQuery.getQueryId();
         final String resultsTopic = KafkaTopics.queryResultsTopic(ryaInstance, queryId);
 

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/010c6927/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/queries/KafkaQueryChangeLogIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/queries/KafkaQueryChangeLogIT.java b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/queries/KafkaQueryChangeLogIT.java
index c2b821f..0dcd079 100644
--- a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/queries/KafkaQueryChangeLogIT.java
+++ b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/queries/KafkaQueryChangeLogIT.java
@@ -79,7 +79,7 @@ public class KafkaQueryChangeLogIT extends KafkaITBase {
     public void testWrite() throws Exception {
         final String sparql = "SOME QUERY HERE";
         final UUID uuid = UUID.randomUUID();
-        final QueryChange newChange = QueryChange.create(uuid, sparql, true);
+        final QueryChange newChange = QueryChange.create(uuid, sparql, true, false);
         changeLog.write(newChange);
 
         consumer.subscribe(Lists.newArrayList(topic));
@@ -93,7 +93,7 @@ public class KafkaQueryChangeLogIT extends KafkaITBase {
     @Test
     public void readSingleWrite() throws Exception {
         // Write a single change to the log.
-        final QueryChange change = QueryChange.create(UUID.randomUUID(), "query", true);
+        final QueryChange change = QueryChange.create(UUID.randomUUID(), "query", true, false);
         changeLog.write(change);
 
         // Read that entry from the log.
@@ -198,7 +198,7 @@ public class KafkaQueryChangeLogIT extends KafkaITBase {
             assertFalse( changeLog2.readFromStart().hasNext() );
 
             // Write a change to the first log.
-            final QueryChange change = QueryChange.create(UUID.randomUUID(), "query", true);
+            final QueryChange change = QueryChange.create(UUID.randomUUID(), "query", true, false);
             changeLog.write(change);
 
             // Show it's in the first log.
@@ -214,7 +214,7 @@ public class KafkaQueryChangeLogIT extends KafkaITBase {
         for (int ii = 0; ii < 10; ii++) {
             final String sparql = "SOME QUERY HERE_" + ii;
             final UUID uuid = UUID.randomUUID();
-            final QueryChange newChange = QueryChange.create(uuid, sparql, true);
+            final QueryChange newChange = QueryChange.create(uuid, sparql, true, false);
             changeLog.write(newChange);
             changes.add(newChange);
         }

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/010c6927/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/LogEventWorkerTest.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/LogEventWorkerTest.java b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/LogEventWorkerTest.java
index cb708ed..2cb543a 100644
--- a/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/LogEventWorkerTest.java
+++ b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/LogEventWorkerTest.java
@@ -79,13 +79,13 @@ public class LogEventWorkerTest {
 
         // Write a message that indicates a new query should be active.
         final UUID firstQueryId = UUID.randomUUID();
-        changeLog.write(QueryChange.create(firstQueryId, "select * where { ?a ?b ?c . }", true));
+        changeLog.write(QueryChange.create(firstQueryId, "select * where { ?a ?b ?c . }", true, false));
 
         // Write a message that adds an active query, but then makes it inactive. Because both of these
         // events are written to the log before the worker subscribes to the repository for updates, they
         // must result in a single query stopped event.
         final UUID secondQueryId = UUID.randomUUID();
-        changeLog.write(QueryChange.create(secondQueryId, "select * where { ?d ?e ?f . }", true));
+        changeLog.write(QueryChange.create(secondQueryId, "select * where { ?d ?e ?f . }", true, false));
         changeLog.write(QueryChange.update(secondQueryId, false));
 
         // Start the worker that will be tested.
@@ -103,7 +103,7 @@ public class LogEventWorkerTest {
             // Query 2, stopped.
             Set<QueryEvent> expectedEvents = new HashSet<>();
             expectedEvents.add(QueryEvent.executing("rya",
-                    new StreamsQuery(firstQueryId, "select * where { ?a ?b ?c . }", true)));
+                    new StreamsQuery(firstQueryId, "select * where { ?a ?b ?c . }", true, false)));
             expectedEvents.add(QueryEvent.stopped("rya", secondQueryId));
 
             Set<QueryEvent> queryEvents = new HashSet<>();
@@ -146,7 +146,7 @@ public class LogEventWorkerTest {
 
         // Write a message that indicates a new query should be active.
         final UUID firstQueryId = UUID.randomUUID();
-        changeLog.write(QueryChange.create(firstQueryId, "select * where { ?a ?b ?c . }", true));
+        changeLog.write(QueryChange.create(firstQueryId, "select * where { ?a ?b ?c . }", true, false));
 
         // Start the worker that will be tested.
         final Thread logEventWorker = new Thread(new LogEventWorker(logEventQueue,
@@ -165,7 +165,7 @@ public class LogEventWorkerTest {
             // second message was effectively skipped as it would have add its work added twice otherwise.
             final Set<QueryEvent> expectedEvents = new HashSet<>();
             expectedEvents.add(QueryEvent.executing("rya",
-                    new StreamsQuery(firstQueryId, "select * where { ?a ?b ?c . }", true)));
+                    new StreamsQuery(firstQueryId, "select * where { ?a ?b ?c . }", true, false)));
 
             final Set<QueryEvent> queryEvents = new HashSet<>();
             queryEvents.add( queryEventQueue.poll(500, TimeUnit.MILLISECONDS) );

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/010c6927/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/QueryEventWorkGeneratorTest.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/QueryEventWorkGeneratorTest.java b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/QueryEventWorkGeneratorTest.java
index 4495e19..ea6a37b 100644
--- a/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/QueryEventWorkGeneratorTest.java
+++ b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/QueryEventWorkGeneratorTest.java
@@ -87,9 +87,9 @@ public class QueryEventWorkGeneratorTest {
 
         // A thread that will attempt to notify the generator with a created query.
         final UUID queryId = UUID.randomUUID();
-        final StreamsQuery query = new StreamsQuery(queryId, "query", true);
+        final StreamsQuery query = new StreamsQuery(queryId, "query", true, false);
         final Thread notifyThread = new Thread(() -> {
-            final QueryChange change = QueryChange.create(queryId, query.getSparql(), query.isActive());
+            final QueryChange change = QueryChange.create(queryId, query.getSparql(), query.isActive(), query.isInsert());
             final ChangeLogEntry<QueryChange> entry = new ChangeLogEntry<>(0, change);
             generator.notify(entry, Optional.of(query));
         });
@@ -108,7 +108,7 @@ public class QueryEventWorkGeneratorTest {
 
             // Show work was added to the queue and the notifying thread died.
             final QueryEvent event = queue.poll(500, TimeUnit.MILLISECONDS);
-            final QueryEvent expected = QueryEvent.executing("rya", new StreamsQuery(queryId, query.getSparql(), query.isActive()));
+            final QueryEvent expected = QueryEvent.executing("rya", new StreamsQuery(queryId, query.getSparql(), query.isActive(), query.isInsert()));
             assertEquals(expected, event);
         } finally {
             shutdownSignal.set(true);
@@ -132,9 +132,9 @@ public class QueryEventWorkGeneratorTest {
 
         // A thread that will attempt to notify the generator with a created query.
         final UUID queryId = UUID.randomUUID();
-        final StreamsQuery query = new StreamsQuery(queryId, "query", true);
+        final StreamsQuery query = new StreamsQuery(queryId, "query", true, false);
         final Thread notifyThread = new Thread(() -> {
-            final QueryChange change = QueryChange.create(queryId, query.getSparql(), query.isActive());
+            final QueryChange change = QueryChange.create(queryId, query.getSparql(), query.isActive(), query.isInsert());
             final ChangeLogEntry<QueryChange> entry = new ChangeLogEntry<>(0, change);
             generator.notify(entry, Optional.of(query));
         });
@@ -145,7 +145,7 @@ public class QueryEventWorkGeneratorTest {
         try {
             // Show work was added to the queue and the notifying thread died.
             final QueryEvent event = queue.poll(500, TimeUnit.MILLISECONDS);
-            final QueryEvent expected = QueryEvent.executing("rya", new StreamsQuery(queryId, query.getSparql(), query.isActive()));
+            final QueryEvent expected = QueryEvent.executing("rya", new StreamsQuery(queryId, query.getSparql(), query.isActive(), query.isInsert()));
             assertEquals(expected, event);
         } finally {
             shutdownSignal.set(true);
@@ -205,7 +205,7 @@ public class QueryEventWorkGeneratorTest {
 
         // A thread that will attempt to notify the generator with an update query change.
         final UUID queryId = UUID.randomUUID();
-        final StreamsQuery query = new StreamsQuery(queryId, "query", true);
+        final StreamsQuery query = new StreamsQuery(queryId, "query", true, false);
         final Thread notifyThread = new Thread(() -> {
             final QueryChange change = QueryChange.update(queryId, true);
             final ChangeLogEntry<QueryChange> entry = new ChangeLogEntry<>(0, change);
@@ -218,7 +218,7 @@ public class QueryEventWorkGeneratorTest {
         try {
             // Show work was added to the queue and the notifying thread died.
             final QueryEvent event = queue.poll(500, TimeUnit.MILLISECONDS);
-            final QueryEvent expected = QueryEvent.executing("rya", new StreamsQuery(queryId, query.getSparql(), query.isActive()));
+            final QueryEvent expected = QueryEvent.executing("rya", new StreamsQuery(queryId, query.getSparql(), query.isActive(), query.isInsert()));
             assertEquals(expected, event);
         } finally {
             shutdownSignal.set(true);
@@ -242,7 +242,7 @@ public class QueryEventWorkGeneratorTest {
 
         // A thread that will attempt to notify the generator with an update query change.
         final UUID queryId = UUID.randomUUID();
-        final StreamsQuery query = new StreamsQuery(queryId, "query", false);
+        final StreamsQuery query = new StreamsQuery(queryId, "query", false, false);
         final Thread notifyThread = new Thread(() -> {
             final QueryChange change = QueryChange.update(queryId, false);
             final ChangeLogEntry<QueryChange> entry = new ChangeLogEntry<>(0, change);

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/010c6927/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/QueryEventWorkerTest.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/QueryEventWorkerTest.java b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/QueryEventWorkerTest.java
index 33c0719..95c7a54 100644
--- a/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/QueryEventWorkerTest.java
+++ b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/QueryEventWorkerTest.java
@@ -68,7 +68,7 @@ public class QueryEventWorkerTest {
 
         // The message that indicates a query needs to be executed.
         final String ryaInstance = "rya";
-        final StreamsQuery query = new StreamsQuery(UUID.randomUUID(), "sparql", true);
+        final StreamsQuery query = new StreamsQuery(UUID.randomUUID(), "sparql", true, false);
         final QueryEvent executingEvent = QueryEvent.executing(ryaInstance, query);
 
         // Release a latch if the startQuery method on the queryExecutor is invoked with the correct values.

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/010c6927/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/QueryManagerTest.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/QueryManagerTest.java b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/QueryManagerTest.java
index 04e70c0..f1c9e0f 100644
--- a/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/QueryManagerTest.java
+++ b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/QueryManagerTest.java
@@ -50,7 +50,7 @@ public class QueryManagerTest {
         //The new QueryChangeLog
         final QueryChangeLog newChangeLog = new InMemoryQueryChangeLog();
         final String ryaInstance = "ryaTestInstance";
-        final StreamsQuery query = new StreamsQuery(UUID.randomUUID(), "some query", true);
+        final StreamsQuery query = new StreamsQuery(UUID.randomUUID(), "some query", true, false);
 
         // when the query executor is told to start the test query on the test
         // rya instance, count down on the countdown latch
@@ -69,7 +69,7 @@ public class QueryManagerTest {
             //The listener created by the Query Manager
             final SourceListener listener = (SourceListener) invocation.getArguments()[0];
             listener.notifyCreate(ryaInstance, newChangeLog);
-            newChangeLog.write(QueryChange.create(query.getQueryId(), query.getSparql(), query.isActive()));
+            newChangeLog.write(QueryChange.create(query.getQueryId(), query.getSparql(), query.isActive(), query.isInsert()));
             return null;
         }).when(source).subscribe(any(SourceListener.class));
 
@@ -91,7 +91,7 @@ public class QueryManagerTest {
     public void testDeleteQuery() throws Exception {
         //The new QueryChangeLog
         final QueryChangeLog newChangeLog = new InMemoryQueryChangeLog();
-        final StreamsQuery query = new StreamsQuery(UUID.randomUUID(), "some query", true);
+        final StreamsQuery query = new StreamsQuery(UUID.randomUUID(), "some query", true, false);
         final String ryaInstance = "ryaTestInstance";
 
         // when the query executor is told to start the test query on the test
@@ -121,7 +121,7 @@ public class QueryManagerTest {
             final SourceListener listener = (SourceListener) invocation.getArguments()[0];
             listener.notifyCreate(ryaInstance, newChangeLog);
             Thread.sleep(1000);
-            newChangeLog.write(QueryChange.create(query.getQueryId(), query.getSparql(), query.isActive()));
+            newChangeLog.write(QueryChange.create(query.getQueryId(), query.getSparql(), query.isActive(), query.isInsert()));
             queryStarted.await(5, TimeUnit.SECONDS);
             newChangeLog.write(QueryChange.delete(query.getQueryId()));
             return null;
@@ -145,7 +145,7 @@ public class QueryManagerTest {
     public void testUpdateQuery() throws Exception {
         // The new QueryChangeLog
         final QueryChangeLog newChangeLog = new InMemoryQueryChangeLog();
-        final StreamsQuery query = new StreamsQuery(UUID.randomUUID(), "some query", true);
+        final StreamsQuery query = new StreamsQuery(UUID.randomUUID(), "some query", true, false);
         final String ryaInstance = "ryaTestInstance";
 
         // when the query executor is told to start the test query on the test
@@ -176,7 +176,7 @@ public class QueryManagerTest {
             final SourceListener listener = (SourceListener) invocation.getArguments()[0];
             listener.notifyCreate(ryaInstance, newChangeLog);
             Thread.sleep(1000);
-            newChangeLog.write(QueryChange.create(query.getQueryId(), query.getSparql(), query.isActive()));
+            newChangeLog.write(QueryChange.create(query.getQueryId(), query.getSparql(), query.isActive(), query.isInsert()));
             queryStarted.await(5, TimeUnit.SECONDS);
             newChangeLog.write(QueryChange.update(query.getQueryId(), false));
             return null;

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/010c6927/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutorIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutorIT.java b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutorIT.java
index 6358104..83f040d 100644
--- a/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutorIT.java
+++ b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutorIT.java
@@ -86,7 +86,7 @@ public class LocalQueryExecutorIT {
     public void runQuery() throws Exception {
         // Test values.
         final String ryaInstance = "rya";
-        final StreamsQuery sQuery = new StreamsQuery(UUID.randomUUID(), "SELECT * WHERE { ?person <urn:worksAt> ?business . }", true);
+        final StreamsQuery sQuery = new StreamsQuery(UUID.randomUUID(), "SELECT * WHERE { ?person <urn:worksAt> ?business . }", true, false);
 
         // Create the statements that will be loaded.
         final ValueFactory vf = new ValueFactoryImpl();

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/010c6927/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutorTest.java
----------------------------------------------------------------------
diff --git a/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutorTest.java b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutorTest.java
index c0f888e..efbcf4b 100644
--- a/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutorTest.java
+++ b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutorTest.java
@@ -46,14 +46,14 @@ public class LocalQueryExecutorTest {
     @Test(expected = IllegalStateException.class)
     public void startQuery_serviceNotStarted() throws Exception {
         final QueryExecutor executor = new LocalQueryExecutor(mock(CreateKafkaTopic.class), mock(KafkaStreamsFactory.class));
-        executor.startQuery("rya", new StreamsQuery(UUID.randomUUID(), "query", true));
+        executor.startQuery("rya", new StreamsQuery(UUID.randomUUID(), "query", true, false));
     }
 
     @Test
     public void startQuery() throws Exception {
         // Test values.
         final String ryaInstance = "rya";
-        final StreamsQuery query = new StreamsQuery(UUID.randomUUID(), "SELECT * WHERE { ?a ?b ?c. }", true);
+        final StreamsQuery query = new StreamsQuery(UUID.randomUUID(), "SELECT * WHERE { ?a ?b ?c. }", true, false);
 
         // Mock the streams factory so that we can tell if the start function is invoked by the executor.
         final KafkaStreamsFactory jobFactory = mock(KafkaStreamsFactory.class);
@@ -97,7 +97,7 @@ public class LocalQueryExecutorTest {
     public void stopQuery() throws Exception {
         // Test values.
         final String ryaInstance = "rya";
-        final StreamsQuery query = new StreamsQuery(UUID.randomUUID(), "SELECT * WHERE { ?a ?b ?c. }", true);
+        final StreamsQuery query = new StreamsQuery(UUID.randomUUID(), "SELECT * WHERE { ?a ?b ?c. }", true, false);
 
         // Mock the streams factory so that we can tell if the stop function is invoked by the executor.
         final KafkaStreamsFactory jobFactory = mock(KafkaStreamsFactory.class);
@@ -131,8 +131,8 @@ public class LocalQueryExecutorTest {
     public void stopAll_noneForThatRyaInstance() throws Exception {
         // Test values.
         final String ryaInstance = "rya";
-        final StreamsQuery query1= new StreamsQuery(UUID.randomUUID(), "SELECT * WHERE { ?a ?b ?c. }", true);
-        final StreamsQuery query2= new StreamsQuery(UUID.randomUUID(), "SELECT * WHERE { ?a ?b ?c. }", true);
+        final StreamsQuery query1= new StreamsQuery(UUID.randomUUID(), "SELECT * WHERE { ?a ?b ?c. }", true, false);
+        final StreamsQuery query2= new StreamsQuery(UUID.randomUUID(), "SELECT * WHERE { ?a ?b ?c. }", true, false);
 
         // Mock the streams factory so that we can tell if the stop function is invoked by the executor.
         final KafkaStreamsFactory jobFactory = mock(KafkaStreamsFactory.class);
@@ -169,9 +169,9 @@ public class LocalQueryExecutorTest {
     public void stopAll() throws Exception {
         // Test values.
         final String ryaInstance1 = "rya1";
-        final StreamsQuery query1= new StreamsQuery(UUID.randomUUID(), "SELECT * WHERE { ?a ?b ?c. }", true);
+        final StreamsQuery query1= new StreamsQuery(UUID.randomUUID(), "SELECT * WHERE { ?a ?b ?c. }", true, false);
         final String ryaInstance2 = "rya2";
-        final StreamsQuery query2= new StreamsQuery(UUID.randomUUID(), "SELECT * WHERE { ?a ?b ?c. }", true);
+        final StreamsQuery query2= new StreamsQuery(UUID.randomUUID(), "SELECT * WHERE { ?a ?b ?c. }", true, false);
 
         // Mock the streams factory so that we can tell if the stop function is invoked by the executor.
         final KafkaStreamsFactory jobFactory = mock(KafkaStreamsFactory.class);
@@ -230,9 +230,9 @@ public class LocalQueryExecutorTest {
     public void getRunningQueryIds_noneStopped() throws Exception {
         // Test values.
         final String ryaInstance = "rya";
-        final StreamsQuery query1 = new StreamsQuery(UUID.randomUUID(), "SELECT * WHERE { ?a ?b ?c. }", true);
-        final StreamsQuery query2 = new StreamsQuery(UUID.randomUUID(), "SELECT * WHERE { ?a ?b ?c. }", true);
-        final StreamsQuery query3 = new StreamsQuery(UUID.randomUUID(), "SELECT * WHERE { ?a ?b ?c. }", true);
+        final StreamsQuery query1 = new StreamsQuery(UUID.randomUUID(), "SELECT * WHERE { ?a ?b ?c. }", true, false);
+        final StreamsQuery query2 = new StreamsQuery(UUID.randomUUID(), "SELECT * WHERE { ?a ?b ?c. }", true, false);
+        final StreamsQuery query3 = new StreamsQuery(UUID.randomUUID(), "SELECT * WHERE { ?a ?b ?c. }", true, false);
 
         // Mock the streams factory so that we can figure out what is started.
         final KafkaStreamsFactory jobFactory = mock(KafkaStreamsFactory.class);
@@ -265,9 +265,9 @@ public class LocalQueryExecutorTest {
     public void getRunningQueryIds_stoppedNoLongerListed() throws Exception {
         // Test values.
         final String ryaInstance = "rya";
-        final StreamsQuery query1 = new StreamsQuery(UUID.randomUUID(), "SELECT * WHERE { ?a ?b ?c. }", true);
-        final StreamsQuery query2 = new StreamsQuery(UUID.randomUUID(), "SELECT * WHERE { ?a ?b ?c. }", true);
-        final StreamsQuery query3 = new StreamsQuery(UUID.randomUUID(), "SELECT * WHERE { ?a ?b ?c. }", true);
+        final StreamsQuery query1 = new StreamsQuery(UUID.randomUUID(), "SELECT * WHERE { ?a ?b ?c. }", true, false);
+        final StreamsQuery query2 = new StreamsQuery(UUID.randomUUID(), "SELECT * WHERE { ?a ?b ?c. }", true, false);
+        final StreamsQuery query3 = new StreamsQuery(UUID.randomUUID(), "SELECT * WHERE { ?a ?b ?c. }", true, false);
 
         // Mock the streams factory so that we can figure out what is started.
         final KafkaStreamsFactory jobFactory = mock(KafkaStreamsFactory.class);

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/010c6927/extras/shell/src/main/java/org/apache/rya/shell/RyaStreamsCommands.java
----------------------------------------------------------------------
diff --git a/extras/shell/src/main/java/org/apache/rya/shell/RyaStreamsCommands.java b/extras/shell/src/main/java/org/apache/rya/shell/RyaStreamsCommands.java
index 5f7df84..fede1a9 100644
--- a/extras/shell/src/main/java/org/apache/rya/shell/RyaStreamsCommands.java
+++ b/extras/shell/src/main/java/org/apache/rya/shell/RyaStreamsCommands.java
@@ -28,13 +28,16 @@ import org.apache.rya.api.client.RyaClient;
 import org.apache.rya.api.client.RyaClientException;
 import org.apache.rya.api.instance.RyaDetails;
 import org.apache.rya.api.instance.RyaDetails.RyaStreamsDetails;
+import org.apache.rya.api.utils.QueryInvestigator;
 import org.apache.rya.shell.SharedShellState.ConnectionState;
+import org.apache.rya.shell.util.ConsolePrinter;
 import org.apache.rya.shell.util.SparqlPrompt;
 import org.apache.rya.shell.util.StreamsQueryFormatter;
 import org.apache.rya.streams.api.RyaStreamsClient;
 import org.apache.rya.streams.api.entity.StreamsQuery;
 import org.apache.rya.streams.api.exception.RyaStreamsException;
 import org.apache.rya.streams.kafka.KafkaRyaStreamsClientFactory;
+import org.openrdf.query.MalformedQueryException;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.shell.core.CommandMarker;
 import org.springframework.shell.core.annotation.CliAvailabilityIndicator;
@@ -61,19 +64,23 @@ public class RyaStreamsCommands implements CommandMarker {
 
     private final SharedShellState state;
     private final SparqlPrompt sparqlPrompt;
+    private final ConsolePrinter consolePrinter;
 
     /**
      * Constructs an instance of {@link RyaStreamsCommands}.
      *
      * @param state - Holds shared state between all of the command classes. (not null)
      * @param sparqlPrompt - Prompts a user for a SPARQL query. (not null)
+     * @param consolePrinter - Prints messages to the console. (not null)
      */
     @Autowired
     public RyaStreamsCommands(
             final SharedShellState state,
-            final SparqlPrompt sparqlPrompt) {
+            final SparqlPrompt sparqlPrompt,
+            final ConsolePrinter consolePrinter) {
         this.state = requireNonNull(state);
         this.sparqlPrompt = requireNonNull(sparqlPrompt);
+        this.consolePrinter = requireNonNull(consolePrinter);
     }
 
     /**
@@ -172,7 +179,10 @@ public class RyaStreamsCommands implements CommandMarker {
     public String addQuery(
             @CliOption(key = {"inactive"}, mandatory = false, unspecifiedDefaultValue = "false", specifiedDefaultValue = "true",
                        help = "Setting this flag will add the query, but not run it. (default: false)")
-            final boolean inactive) {
+            final boolean inactive,
+            @CliOption(key = {"insert"}, mandatory = false, unspecifiedDefaultValue = "false", specifiedDefaultValue = "true",
+                       help = "Setting this flag will insert the query's results back into Rya. (default: false)")
+            final boolean isInsert) {
         final RyaStreamsClient streamsClient = state.getShellState().getRyaStreamsCommands().get();
 
         // Prompt the user for the SPARQL that defines the query.
@@ -184,10 +194,32 @@ public class RyaStreamsCommands implements CommandMarker {
                 return "";
             }
 
-            final StreamsQuery streamsQuery = streamsClient.getAddQuery().addQuery(sparql.get(), !inactive);
+            final boolean isConstructQuery = QueryInvestigator.isConstruct(sparql.get());
+            final boolean isInsertQuery = QueryInvestigator.isInsertWhere(sparql.get());
+
+            // If the user wants to insert a CONSTRUCT into Rya, print a warning.
+            if(isInsert && isConstructQuery) {
+                consolePrinter.println("WARNING: CONSTRUCT is part of the SPARQL Query API, so they do not normally");
+                consolePrinter.println("get written back to the triple store. Consider using an INSERT, which is");
+                consolePrinter.println("part of the SPARQL Update API, in the future.");
+            }
+
+            // If the user wants to use an INSERT query, but not insert it back into Rya, suggest using a construct.
+            if(!isInsert && isInsertQuery) {
+                consolePrinter.println("WARNING: INSERT is part of the SPARQL Update API, so they normally get written");
+                consolePrinter.println("back to the triple store. Consider using a CONSTRUCT, which is part of the");
+                consolePrinter.println("SPARQL Query API, in the future.");
+            }
+
+            // If the user wants to insert the query back into Rya, make sure it is a legal query to do that.
+            if(isInsert && !(isConstructQuery || isInsertQuery)) {
+                throw new RuntimeException("Only CONSTRUCT queries and INSERT updates may be inserted back to the triple store.");
+            }
+
+            final StreamsQuery streamsQuery = streamsClient.getAddQuery().addQuery(sparql.get(), !inactive, isInsert);
             return "The added query's ID is " + streamsQuery.getQueryId();
 
-        } catch (final IOException | RyaStreamsException e) {
+        } catch (final MalformedQueryException | IOException | RyaStreamsException e) {
             throw new RuntimeException("Unable to add the SPARQL query to the Rya Streams subsystem.", e);
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/010c6927/extras/shell/src/main/java/org/apache/rya/shell/util/StreamsQueryFormatter.java
----------------------------------------------------------------------
diff --git a/extras/shell/src/main/java/org/apache/rya/shell/util/StreamsQueryFormatter.java b/extras/shell/src/main/java/org/apache/rya/shell/util/StreamsQueryFormatter.java
index 6c06caf..babeec8 100644
--- a/extras/shell/src/main/java/org/apache/rya/shell/util/StreamsQueryFormatter.java
+++ b/extras/shell/src/main/java/org/apache/rya/shell/util/StreamsQueryFormatter.java
@@ -63,6 +63,7 @@ public final class StreamsQueryFormatter {
         final StringBuilder builder = new StringBuilder();
         builder.append(" Query ID: ").append( query.getQueryId() ) .append("\n");
         builder.append("Is Active: ").append( query.isActive() ).append("\n");
+        builder.append("Is Insert: ").append( query.isInsert() ).append("\n");
         builder.append("   SPARQL: ").append( lines[0] ).append("\n");
 
         for(int i = 1; i < lines.length; i++) {