You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rya.apache.org by ca...@apache.org on 2017/08/25 19:35:33 UTC

[2/5] incubator-rya git commit: RYA-246-Query-Export-Strategy. Closes #213.

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/pruner/PeriodicQueryPruner.java
----------------------------------------------------------------------
diff --git a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/pruner/PeriodicQueryPruner.java b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/pruner/PeriodicQueryPruner.java
index 97e3f22..516690e 100644
--- a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/pruner/PeriodicQueryPruner.java
+++ b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/pruner/PeriodicQueryPruner.java
@@ -26,9 +26,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.fluo.api.client.FluoClient;
 import org.apache.fluo.api.client.Snapshot;
 import org.apache.fluo.api.client.SnapshotBase;
-import org.apache.fluo.api.data.Bytes;
 import org.apache.log4j.Logger;
-import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
+import org.apache.rya.indexing.pcj.fluo.app.NodeType;
 import org.apache.rya.indexing.pcj.fluo.app.util.PeriodicQueryUtil;
 import org.apache.rya.periodic.notification.api.BinPruner;
 import org.apache.rya.periodic.notification.api.NodeBin;
@@ -79,12 +78,12 @@ public class PeriodicQueryPruner implements BinPruner, Runnable {
      */
     @Override
     public void pruneBindingSetBin(NodeBin nodeBin) {
-        String id = nodeBin.getNodeId();
+        String pcjId = nodeBin.getNodeId();
         long bin = nodeBin.getBin();
         try(Snapshot sx = client.newSnapshot()) {
-            String queryId = sx.get(Bytes.of(id), FluoQueryColumns.PCJ_ID_QUERY_ID).toString();
+            String queryId = NodeType.generateNewIdForType(NodeType.QUERY, pcjId);
             Set<String> fluoIds = getNodeIdsFromResultId(sx, queryId);
-            accPruner.pruneBindingSetBin(new NodeBin(id, bin));
+            accPruner.pruneBindingSetBin(nodeBin);
             for(String fluoId: fluoIds) {
                 fluoPruner.pruneBindingSetBin(new NodeBin(fluoId, bin));
             }

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/recovery/PeriodicNotificationProvider.java
----------------------------------------------------------------------
diff --git a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/recovery/PeriodicNotificationProvider.java b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/recovery/PeriodicNotificationProvider.java
index 27e06f0..69bd39c 100644
--- a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/recovery/PeriodicNotificationProvider.java
+++ b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/recovery/PeriodicNotificationProvider.java
@@ -35,6 +35,7 @@ import org.apache.rya.indexing.pcj.fluo.app.NodeType;
 import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
 import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO;
 import org.apache.rya.indexing.pcj.fluo.app.query.PeriodicQueryMetadata;
+import org.apache.rya.indexing.pcj.fluo.app.util.FluoQueryUtils;
 import org.apache.rya.periodic.notification.api.NotificationCoordinatorExecutor;
 import org.apache.rya.periodic.notification.coordinator.PeriodicNotificationCoordinatorExecutor;
 import org.apache.rya.periodic.notification.notification.CommandNotification;
@@ -120,7 +121,7 @@ public class PeriodicNotificationProvider {
             id = getQueryIdFromPeriodicId(sx, sx.get(Bytes.of(nodeId), FluoQueryColumns.PERIODIC_QUERY_PARENT_NODE_ID).toString());
             break;
         case QUERY:
-            id = sx.get(Bytes.of(nodeId), FluoQueryColumns.RYA_PCJ_ID).toString();
+            id = FluoQueryUtils.convertFluoQueryIdToPcjId(nodeId);
             break;
         case AGGREGATION: 
             id = getQueryIdFromPeriodicId(sx, sx.get(Bytes.of(nodeId), FluoQueryColumns.AGGREGATION_PARENT_NODE_ID).toString());

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/shell/src/main/java/org/apache/rya/shell/RyaAdminCommands.java
----------------------------------------------------------------------
diff --git a/extras/shell/src/main/java/org/apache/rya/shell/RyaAdminCommands.java b/extras/shell/src/main/java/org/apache/rya/shell/RyaAdminCommands.java
index 9239dc7..8fd95d3 100644
--- a/extras/shell/src/main/java/org/apache/rya/shell/RyaAdminCommands.java
+++ b/extras/shell/src/main/java/org/apache/rya/shell/RyaAdminCommands.java
@@ -22,8 +22,11 @@ import static java.util.Objects.requireNonNull;
 
 import java.io.IOException;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 
+import org.apache.rya.api.client.CreatePCJ.ExportStrategy;
 import org.apache.rya.api.client.GetInstanceDetails;
 import org.apache.rya.api.client.Install.DuplicateInstanceNameException;
 import org.apache.rya.api.client.Install.InstallConfiguration;
@@ -280,7 +283,11 @@ public class RyaAdminCommands implements CommandMarker {
     }
 
     @CliCommand(value = CREATE_PCJ_CMD, help = "Creates and starts the maintenance of a new PCJ using a Fluo application.")
-    public String createPcj() {
+    public String createPcj(
+            @CliOption(key = {"exportToRya"}, mandatory = false, help = "Indicates that results for the query should be exported to a Rya PCJ table.")
+            boolean exportToRya,
+            @CliOption(key = {"exportToKafka"}, mandatory = false, help = "Indicates that results for the query should be exported to a Kafka Topic.")
+            boolean exportToKafka) {
         // Fetch the command that is connected to the store.
         final ShellState shellState = state.getShellState();
         final RyaClient commands = shellState.getConnectedCommands().get();
@@ -290,8 +297,18 @@ public class RyaAdminCommands implements CommandMarker {
             // Prompt the user for the SPARQL.
             final Optional<String> sparql = sparqlPrompt.getSparql();
             if (sparql.isPresent()) {
+                Set<ExportStrategy> strategies = new HashSet<>();
+                if(exportToRya) {
+                    strategies.add(ExportStrategy.RYA);
+                }
+                if(exportToKafka) {
+                    strategies.add(ExportStrategy.KAFKA);
+                }
+                if(strategies.size() == 0) {
+                    return "The user must specify at least one export strategy by setting either exportToRya or exportToKafka to true."; 
+                }
                 // Execute the command.
-                final String pcjId = commands.getCreatePCJ().createPCJ(ryaInstance, sparql.get());
+                final String pcjId = commands.getCreatePCJ().createPCJ(ryaInstance, sparql.get(), strategies);
                 // Return a message that indicates the ID of the newly created ID.
                 return String.format("The PCJ has been created. Its ID is '%s'.", pcjId);
             } else {

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/shell/src/test/java/org/apache/rya/shell/RyaAdminCommandsTest.java
----------------------------------------------------------------------
diff --git a/extras/shell/src/test/java/org/apache/rya/shell/RyaAdminCommandsTest.java b/extras/shell/src/test/java/org/apache/rya/shell/RyaAdminCommandsTest.java
index e3e8d98..cab34e9 100644
--- a/extras/shell/src/test/java/org/apache/rya/shell/RyaAdminCommandsTest.java
+++ b/extras/shell/src/test/java/org/apache/rya/shell/RyaAdminCommandsTest.java
@@ -28,6 +28,7 @@ import static org.mockito.Mockito.when;
 import java.io.IOException;
 import java.util.Date;
 import java.util.List;
+import java.util.Set;
 import java.util.TimeZone;
 
 import org.apache.rya.api.client.AddUser;
@@ -43,6 +44,7 @@ import org.apache.rya.api.client.RemoveUser;
 import org.apache.rya.api.client.RyaClient;
 import org.apache.rya.api.client.RyaClientException;
 import org.apache.rya.api.client.Uninstall;
+import org.apache.rya.api.client.CreatePCJ.ExportStrategy;
 import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails;
 import org.apache.rya.api.instance.RyaDetails;
 import org.apache.rya.api.instance.RyaDetails.EntityCentricIndexDetails;
@@ -61,6 +63,7 @@ import org.junit.Test;
 
 import com.google.common.base.Optional;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
 
 /**
  * Unit tests the methods of {@link RyaAdminCommands}.
@@ -74,7 +77,8 @@ public class RyaAdminCommandsTest {
         final String sparql = "SELECT * WHERE { ?person <http://isA> ?noun }";
         final String pcjId = "123412342";
         final CreatePCJ mockCreatePCJ = mock(CreatePCJ.class);
-        when(mockCreatePCJ.createPCJ( eq(instanceName), eq(sparql) ) ).thenReturn( pcjId );
+        final Set<ExportStrategy> strategies = Sets.newHashSet(ExportStrategy.RYA);
+        when(mockCreatePCJ.createPCJ( eq(instanceName), eq(sparql), eq(strategies) ) ).thenReturn( pcjId );
 
         final RyaClient mockCommands = mock(RyaClient.class);
         when(mockCommands.getCreatePCJ()).thenReturn( mockCreatePCJ );
@@ -88,10 +92,10 @@ public class RyaAdminCommandsTest {
 
         // Execute the command.
         final RyaAdminCommands commands = new RyaAdminCommands(state, mock(InstallPrompt.class), mockSparqlPrompt, mock(UninstallPrompt.class));
-        final String message = commands.createPcj();
+        final String message = commands.createPcj(true, false);
 
         // Verify the values that were provided to the command were passed through to CreatePCJ.
-        verify(mockCreatePCJ).createPCJ(eq(instanceName), eq(sparql));
+        verify(mockCreatePCJ).createPCJ(eq(instanceName), eq(sparql), eq(strategies));
 
         // Verify a message is returned that explains what was created.
         final String expected = "The PCJ has been created. Its ID is '123412342'.";
@@ -114,7 +118,7 @@ public class RyaAdminCommandsTest {
 
         // Execute the command.
         final RyaAdminCommands commands = new RyaAdminCommands(state, mock(InstallPrompt.class), mockSparqlPrompt, mock(UninstallPrompt.class));
-        final String message = commands.createPcj();
+        final String message = commands.createPcj(true, false);
 
         // Verify a message is returned that explains what was created.
         final String expected = "";