You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rya.apache.org by ca...@apache.org on 2017/08/25 19:35:33 UTC
[2/5] incubator-rya git commit: RYA-246-Query-Export-Strategy. Closes
#213.
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/pruner/PeriodicQueryPruner.java
----------------------------------------------------------------------
diff --git a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/pruner/PeriodicQueryPruner.java b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/pruner/PeriodicQueryPruner.java
index 97e3f22..516690e 100644
--- a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/pruner/PeriodicQueryPruner.java
+++ b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/pruner/PeriodicQueryPruner.java
@@ -26,9 +26,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.fluo.api.client.FluoClient;
import org.apache.fluo.api.client.Snapshot;
import org.apache.fluo.api.client.SnapshotBase;
-import org.apache.fluo.api.data.Bytes;
import org.apache.log4j.Logger;
-import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
+import org.apache.rya.indexing.pcj.fluo.app.NodeType;
import org.apache.rya.indexing.pcj.fluo.app.util.PeriodicQueryUtil;
import org.apache.rya.periodic.notification.api.BinPruner;
import org.apache.rya.periodic.notification.api.NodeBin;
@@ -79,12 +78,12 @@ public class PeriodicQueryPruner implements BinPruner, Runnable {
*/
@Override
public void pruneBindingSetBin(NodeBin nodeBin) {
- String id = nodeBin.getNodeId();
+ String pcjId = nodeBin.getNodeId();
long bin = nodeBin.getBin();
try(Snapshot sx = client.newSnapshot()) {
- String queryId = sx.get(Bytes.of(id), FluoQueryColumns.PCJ_ID_QUERY_ID).toString();
+ String queryId = NodeType.generateNewIdForType(NodeType.QUERY, pcjId);
Set<String> fluoIds = getNodeIdsFromResultId(sx, queryId);
- accPruner.pruneBindingSetBin(new NodeBin(id, bin));
+ accPruner.pruneBindingSetBin(nodeBin);
for(String fluoId: fluoIds) {
fluoPruner.pruneBindingSetBin(new NodeBin(fluoId, bin));
}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/recovery/PeriodicNotificationProvider.java
----------------------------------------------------------------------
diff --git a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/recovery/PeriodicNotificationProvider.java b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/recovery/PeriodicNotificationProvider.java
index 27e06f0..69bd39c 100644
--- a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/recovery/PeriodicNotificationProvider.java
+++ b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/recovery/PeriodicNotificationProvider.java
@@ -35,6 +35,7 @@ import org.apache.rya.indexing.pcj.fluo.app.NodeType;
import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO;
import org.apache.rya.indexing.pcj.fluo.app.query.PeriodicQueryMetadata;
+import org.apache.rya.indexing.pcj.fluo.app.util.FluoQueryUtils;
import org.apache.rya.periodic.notification.api.NotificationCoordinatorExecutor;
import org.apache.rya.periodic.notification.coordinator.PeriodicNotificationCoordinatorExecutor;
import org.apache.rya.periodic.notification.notification.CommandNotification;
@@ -120,7 +121,7 @@ public class PeriodicNotificationProvider {
id = getQueryIdFromPeriodicId(sx, sx.get(Bytes.of(nodeId), FluoQueryColumns.PERIODIC_QUERY_PARENT_NODE_ID).toString());
break;
case QUERY:
- id = sx.get(Bytes.of(nodeId), FluoQueryColumns.RYA_PCJ_ID).toString();
+ id = FluoQueryUtils.convertFluoQueryIdToPcjId(nodeId);
break;
case AGGREGATION:
id = getQueryIdFromPeriodicId(sx, sx.get(Bytes.of(nodeId), FluoQueryColumns.AGGREGATION_PARENT_NODE_ID).toString());
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/shell/src/main/java/org/apache/rya/shell/RyaAdminCommands.java
----------------------------------------------------------------------
diff --git a/extras/shell/src/main/java/org/apache/rya/shell/RyaAdminCommands.java b/extras/shell/src/main/java/org/apache/rya/shell/RyaAdminCommands.java
index 9239dc7..8fd95d3 100644
--- a/extras/shell/src/main/java/org/apache/rya/shell/RyaAdminCommands.java
+++ b/extras/shell/src/main/java/org/apache/rya/shell/RyaAdminCommands.java
@@ -22,8 +22,11 @@ import static java.util.Objects.requireNonNull;
import java.io.IOException;
import java.util.Collections;
+import java.util.HashSet;
import java.util.List;
+import java.util.Set;
+import org.apache.rya.api.client.CreatePCJ.ExportStrategy;
import org.apache.rya.api.client.GetInstanceDetails;
import org.apache.rya.api.client.Install.DuplicateInstanceNameException;
import org.apache.rya.api.client.Install.InstallConfiguration;
@@ -280,7 +283,11 @@ public class RyaAdminCommands implements CommandMarker {
}
@CliCommand(value = CREATE_PCJ_CMD, help = "Creates and starts the maintenance of a new PCJ using a Fluo application.")
- public String createPcj() {
+ public String createPcj(
+ @CliOption(key = {"exportToRya"}, mandatory = false, help = "Indicates that results for the query should be exported to a Rya PCJ table.")
+ boolean exportToRya,
+ @CliOption(key = {"exportToKafka"}, mandatory = false, help = "Indicates that results for the query should be exported to a Kafka Topic.")
+ boolean exportToKafka) {
// Fetch the command that is connected to the store.
final ShellState shellState = state.getShellState();
final RyaClient commands = shellState.getConnectedCommands().get();
@@ -290,8 +297,18 @@ public class RyaAdminCommands implements CommandMarker {
// Prompt the user for the SPARQL.
final Optional<String> sparql = sparqlPrompt.getSparql();
if (sparql.isPresent()) {
+ Set<ExportStrategy> strategies = new HashSet<>();
+ if(exportToRya) {
+ strategies.add(ExportStrategy.RYA);
+ }
+ if(exportToKafka) {
+ strategies.add(ExportStrategy.KAFKA);
+ }
+ if(strategies.size() == 0) {
+ return "The user must specify at least one export strategy by setting either exportToRya or exportToKafka to true.";
+ }
// Execute the command.
- final String pcjId = commands.getCreatePCJ().createPCJ(ryaInstance, sparql.get());
+ final String pcjId = commands.getCreatePCJ().createPCJ(ryaInstance, sparql.get(), strategies);
// Return a message that indicates the ID of the newly created ID.
return String.format("The PCJ has been created. Its ID is '%s'.", pcjId);
} else {
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/shell/src/test/java/org/apache/rya/shell/RyaAdminCommandsTest.java
----------------------------------------------------------------------
diff --git a/extras/shell/src/test/java/org/apache/rya/shell/RyaAdminCommandsTest.java b/extras/shell/src/test/java/org/apache/rya/shell/RyaAdminCommandsTest.java
index e3e8d98..cab34e9 100644
--- a/extras/shell/src/test/java/org/apache/rya/shell/RyaAdminCommandsTest.java
+++ b/extras/shell/src/test/java/org/apache/rya/shell/RyaAdminCommandsTest.java
@@ -28,6 +28,7 @@ import static org.mockito.Mockito.when;
import java.io.IOException;
import java.util.Date;
import java.util.List;
+import java.util.Set;
import java.util.TimeZone;
import org.apache.rya.api.client.AddUser;
@@ -43,6 +44,7 @@ import org.apache.rya.api.client.RemoveUser;
import org.apache.rya.api.client.RyaClient;
import org.apache.rya.api.client.RyaClientException;
import org.apache.rya.api.client.Uninstall;
+import org.apache.rya.api.client.CreatePCJ.ExportStrategy;
import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails;
import org.apache.rya.api.instance.RyaDetails;
import org.apache.rya.api.instance.RyaDetails.EntityCentricIndexDetails;
@@ -61,6 +63,7 @@ import org.junit.Test;
import com.google.common.base.Optional;
import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
/**
* Unit tests the methods of {@link RyaAdminCommands}.
@@ -74,7 +77,8 @@ public class RyaAdminCommandsTest {
final String sparql = "SELECT * WHERE { ?person <http://isA> ?noun }";
final String pcjId = "123412342";
final CreatePCJ mockCreatePCJ = mock(CreatePCJ.class);
- when(mockCreatePCJ.createPCJ( eq(instanceName), eq(sparql) ) ).thenReturn( pcjId );
+ final Set<ExportStrategy> strategies = Sets.newHashSet(ExportStrategy.RYA);
+ when(mockCreatePCJ.createPCJ( eq(instanceName), eq(sparql), eq(strategies) ) ).thenReturn( pcjId );
final RyaClient mockCommands = mock(RyaClient.class);
when(mockCommands.getCreatePCJ()).thenReturn( mockCreatePCJ );
@@ -88,10 +92,10 @@ public class RyaAdminCommandsTest {
// Execute the command.
final RyaAdminCommands commands = new RyaAdminCommands(state, mock(InstallPrompt.class), mockSparqlPrompt, mock(UninstallPrompt.class));
- final String message = commands.createPcj();
+ final String message = commands.createPcj(true, false);
// Verify the values that were provided to the command were passed through to CreatePCJ.
- verify(mockCreatePCJ).createPCJ(eq(instanceName), eq(sparql));
+ verify(mockCreatePCJ).createPCJ(eq(instanceName), eq(sparql), eq(strategies));
// Verify a message is returned that explains what was created.
final String expected = "The PCJ has been created. Its ID is '123412342'.";
@@ -114,7 +118,7 @@ public class RyaAdminCommandsTest {
// Execute the command.
final RyaAdminCommands commands = new RyaAdminCommands(state, mock(InstallPrompt.class), mockSparqlPrompt, mock(UninstallPrompt.class));
- final String message = commands.createPcj();
+ final String message = commands.createPcj(true, false);
// Verify a message is returned that explains what was created.
final String expected = "";