You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rya.apache.org by ca...@apache.org on 2017/08/25 19:35:34 UTC
[3/5] incubator-rya git commit: RYA-246-Query-Export-Strategy. Closes
#213.
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQuery.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQuery.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQuery.java
index 65db02c..17ab14f 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQuery.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQuery.java
@@ -27,7 +27,8 @@ import java.util.Map;
import java.util.Map.Entry;
import org.apache.commons.lang3.builder.EqualsBuilder;
-import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.QueryType;
+import org.apache.rya.api.client.CreatePCJ.ExportStrategy;
+import org.apache.rya.api.client.CreatePCJ.QueryType;
import com.google.common.base.Objects;
import com.google.common.base.Optional;
@@ -92,9 +93,9 @@ public class FluoQuery {
this.filterMetadata = requireNonNull(filterMetadata);
this.joinMetadata = requireNonNull(joinMetadata);
if(constructMetadata.isPresent()) {
- this.type = QueryType.Construct;
+ this.type = QueryType.CONSTRUCT;
} else {
- this.type = QueryType.Projection;
+ this.type = QueryType.PROJECTION;
}
}
@@ -568,8 +569,9 @@ public class FluoQuery {
/**
* @return Creates a {@link FluoQuery} using the values that have been supplied to this builder.
+ * @throws UnsupportedQueryException
*/
- public FluoQuery build() {
+ public FluoQuery build() throws UnsupportedQueryException {
checkArgument((projectionBuilders.size() > 0 || constructBuilder != null));
Optional<PeriodicQueryMetadata.Builder> optionalPeriodicQueryBuilder = getPeriodicQueryBuilder();
@@ -603,12 +605,18 @@ public class FluoQuery {
aggregateMetadata.put(entry.getKey(), entry.getValue().build());
}
+ QueryMetadata qMetadata = queryBuilder.build();
+
if(constructBuilder != null) {
if(periodicQueryMetadata != null) {
- throw new IllegalArgumentException("Queries containing sliding window filters and construct query patterns are not supported.");
+ throw new UnsupportedQueryException("Queries containing sliding window filters and construct query patterns are not supported.");
}
- return new FluoQuery(queryBuilder.build(), projectionMetadata.build(), Optional.of(constructBuilder.build()), Optional.fromNullable(periodicQueryMetadata), spMetadata.build(), filterMetadata.build(), joinMetadata.build(), aggregateMetadata.build());
+ return new FluoQuery(qMetadata, projectionMetadata.build(), Optional.of(constructBuilder.build()), Optional.fromNullable(periodicQueryMetadata), spMetadata.build(), filterMetadata.build(), joinMetadata.build(), aggregateMetadata.build());
} else {
+ if(aggregationBuilders.size() > 0 && qMetadata.getQueryType() == QueryType.PROJECTION && qMetadata.getExportStrategies().contains(ExportStrategy.RYA)) {
+ throw new UnsupportedQueryException("Exporting to Rya PCJ tables is currently not supported for queries containing aggregations.");
+ }
+
return new FluoQuery(queryBuilder.build(), projectionMetadata.build(), Optional.absent(), Optional.fromNullable(periodicQueryMetadata), spMetadata.build(), filterMetadata.build(), joinMetadata.build(), aggregateMetadata.build());
}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryColumns.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryColumns.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryColumns.java
index 2eae4ff..8569a48 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryColumns.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryColumns.java
@@ -162,32 +162,6 @@ public class FluoQueryColumns {
*/
public static final Column TRIPLES = new Column("triples", "SPO");
- /**
- * Stores the Rya assigned PCJ ID that the query's results reflect. This
- * value defines where the results will be exported to.
- * <p>
- * <table border="1" style="width:100%">
- * <tr> <th>Fluo Row</td> <th>Fluo Column</td> <th>Fluo Value</td> </tr>
- * <tr> <td>Query ID</td> <td>query:ryaPcjId</td> <td>Identifies which PCJ the results of this query will be exported to.</td> </tr>
- * </table>
- * </p>
- */
- public static final Column RYA_PCJ_ID = new Column("query", "ryaPcjId");
-
- /**
- * Associates a PCJ ID with a Query ID. This enables a quick lookup of the Query ID from the PCJ ID and is useful of Deleting PCJs.
- * <p>
- * <table border="1" style="width:100%">
- * <tr> <th>Fluo Row</td> <th>Fluo Column</td> <th>Fluo Value</td> </tr>
- * <tr> <td>PCJ ID</td> <td>ryaPcjId:queryId</td> <td>Identifies which Query ID is associated with the given PCJ ID.</td> </tr>
- * </table>
- * </p>
- */
- public static final Column PCJ_ID_QUERY_ID = new Column("ryaPcjId", "queryId");
-
- // Sparql to Query ID used to list all queries that are in the system.
- public static final Column QUERY_ID = new Column("sparql", "queryId");
-
// Query Metadata columns.
public static final Column QUERY_NODE_ID = new Column(QUERY_METADATA_CF, "nodeId");
public static final Column QUERY_VARIABLE_ORDER = new Column(QUERY_METADATA_CF, "variableOrder");
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java
index 1c34836..d5d9fe7 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java
@@ -34,11 +34,11 @@ import org.apache.fluo.api.client.SnapshotBase;
import org.apache.fluo.api.client.TransactionBase;
import org.apache.fluo.api.data.Bytes;
import org.apache.fluo.api.data.Column;
+import org.apache.rya.api.client.CreatePCJ.ExportStrategy;
+import org.apache.rya.api.client.CreatePCJ.QueryType;
import org.apache.rya.indexing.pcj.fluo.app.ConstructGraph;
import org.apache.rya.indexing.pcj.fluo.app.ConstructGraphSerializer;
import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants;
-import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.ExportStrategy;
-import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.QueryType;
import org.apache.rya.indexing.pcj.fluo.app.NodeType;
import org.apache.rya.indexing.pcj.fluo.app.query.AggregationMetadata.AggregationElement;
import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata.JoinType;
@@ -585,21 +585,13 @@ public class FluoQueryMetadataDAO {
requireNonNull(tx);
requireNonNull(query);
- QueryMetadata queryMetadata = query.getQueryMetadata();
- final String sparql = queryMetadata.getSparql();
- final String queryId = queryMetadata.getNodeId();
- final String pcjId = queryMetadata.getExportId();
-
// The results of the query are eventually exported to an instance
// of Rya, so store the Rya ID for the PCJ.
- tx.set(queryId, FluoQueryColumns.RYA_PCJ_ID, pcjId);
- tx.set(pcjId, FluoQueryColumns.PCJ_ID_QUERY_ID, queryId);
- tx.set(Bytes.of(sparql), FluoQueryColumns.QUERY_ID, Bytes.of(queryId));
- write(tx, queryMetadata);
+ write(tx, query.getQueryMetadata());
// Write the rest of the metadata objects.
- if (query.getQueryType() == QueryType.Construct) {
+ if (query.getQueryType() == QueryType.CONSTRUCT) {
ConstructQueryMetadata constructMetadata = query.getConstructQueryMetadata().get();
write(tx, constructMetadata);
}
@@ -636,8 +628,9 @@ public class FluoQueryMetadataDAO {
* @param sx - The snapshot that will be used to read the metadata from the Fluo table. (not null)
* @param queryId - The ID of the query whose nodes will be read. (not null)
* @return The {@link FluoQuery} that was read from table.
+ * @throws UnsupportedQueryException
*/
- public FluoQuery readFluoQuery(final SnapshotBase sx, final String queryId) {
+ public FluoQuery readFluoQuery(final SnapshotBase sx, final String queryId) throws UnsupportedQueryException {
requireNonNull(sx);
requireNonNull(queryId);
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/QueryMetadata.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/QueryMetadata.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/QueryMetadata.java
index e46b405..40c9e03 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/QueryMetadata.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/QueryMetadata.java
@@ -24,12 +24,11 @@ import java.util.Optional;
import java.util.Set;
import org.apache.commons.lang3.builder.EqualsBuilder;
-import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.ExportStrategy;
-import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.QueryType;
+import org.apache.rya.api.client.CreatePCJ.ExportStrategy;
+import org.apache.rya.api.client.CreatePCJ.QueryType;
import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
import com.google.common.base.Objects;
-import com.google.common.base.Preconditions;
import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
import edu.umd.cs.findbugs.annotations.NonNull;
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/SparqlFluoQueryBuilder.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/SparqlFluoQueryBuilder.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/SparqlFluoQueryBuilder.java
index 7bf6f45..7b21575 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/SparqlFluoQueryBuilder.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/SparqlFluoQueryBuilder.java
@@ -40,12 +40,12 @@ import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicReference;
+import org.apache.rya.api.client.CreatePCJ.ExportStrategy;
+import org.apache.rya.api.client.CreatePCJ.QueryType;
import org.apache.rya.indexing.pcj.fluo.app.ConstructGraph;
import org.apache.rya.indexing.pcj.fluo.app.ConstructProjection;
import org.apache.rya.indexing.pcj.fluo.app.FluoStringConverter;
import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants;
-import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.ExportStrategy;
-import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.QueryType;
import org.apache.rya.indexing.pcj.fluo.app.NodeType;
import org.apache.rya.indexing.pcj.fluo.app.query.AggregationMetadata.AggregationElement;
import org.apache.rya.indexing.pcj.fluo.app.query.AggregationMetadata.AggregationType;
@@ -106,7 +106,7 @@ public class SparqlFluoQueryBuilder {
//Default behavior is to export to Kafka - subject to change when user can
//specify their own export strategy
- private Set<ExportStrategy> exportStrategies = new HashSet<>(Arrays.asList(ExportStrategy.Kafka));
+ private Set<ExportStrategy> exportStrategies = new HashSet<>(Arrays.asList(ExportStrategy.KAFKA));
public SparqlFluoQueryBuilder setSparql(String sparql) {
this.sparql = Preconditions.checkNotNull(sparql);
@@ -145,7 +145,7 @@ public class SparqlFluoQueryBuilder {
return this;
}
- public FluoQuery build() {
+ public FluoQuery build() throws UnsupportedQueryException {
Preconditions.checkNotNull(sparql);
Preconditions.checkNotNull(queryId);
Preconditions.checkNotNull(exportStrategies);
@@ -172,10 +172,12 @@ public class SparqlFluoQueryBuilder {
QueryMetadata.Builder queryBuilder = QueryMetadata.builder(queryId);
//sets {@link QueryType} and VariableOrder
setVarOrderAndQueryType(queryBuilder, te);
- queryBuilder.setSparql(sparql);
- queryBuilder.setChildNodeId(childNodeId);
- queryBuilder.setExportStrategies(exportStrategies);
- queryBuilder.setJoinBatchSize(joinBatchSize);
+ queryBuilder
+ .setSparql(sparql)
+ .setChildNodeId(childNodeId)
+ .setExportStrategies(exportStrategies)
+ .setJoinBatchSize(joinBatchSize);
+
fluoQueryBuilder.setQueryMetadata(queryBuilder);
setChildMetadata(fluoQueryBuilder, childNodeId, queryBuilder.getVariableOrder(), queryId);
@@ -800,7 +802,7 @@ public class SparqlFluoQueryBuilder {
}
if(queryType == null) {
- queryType = QueryType.Projection;
+ queryType = QueryType.PROJECTION;
}
super.meet(node);
}
@@ -811,14 +813,14 @@ public class SparqlFluoQueryBuilder {
}
if(queryType == null) {
- queryType = QueryType.Construct;
+ queryType = QueryType.CONSTRUCT;
}
super.meet(node);
}
public void meetOther(final QueryModelNode node) throws Exception {
if (node instanceof PeriodicQueryNode) {
- queryType = QueryType.Periodic;
+ queryType = QueryType.PERIODIC;
} else {
super.meetOther(node);
}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/UnsupportedQueryException.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/UnsupportedQueryException.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/UnsupportedQueryException.java
new file mode 100644
index 0000000..155b8da
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/UnsupportedQueryException.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.query;
+
+import org.apache.rya.api.client.CreatePCJ.ExportStrategy;
+
+/**
+ * This Exception thrown if the Rya Fluo Application does not support
+ * the given SPARQL query. This could happen for a number of reasons. The
+ * two most common reasons are that the query possesses some combination of query nodes
+ * that the application can't evaluate, or that the {@link ExportStrategy} of the query
+ * is incompatible with one of its query nodes.
+ *
+ */
+public class UnsupportedQueryException extends Exception {
+ private static final long serialVersionUID = 1L;
+
+ public UnsupportedQueryException(final String message) {
+ super(message);
+ }
+
+ public UnsupportedQueryException(final String message, final Throwable cause) {
+ super(message, cause);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/FluoQueryUtils.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/FluoQueryUtils.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/FluoQueryUtils.java
index ac41160..7a5b439 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/FluoQueryUtils.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/FluoQueryUtils.java
@@ -19,6 +19,7 @@
package org.apache.rya.indexing.pcj.fluo.app.util;
import java.util.List;
+import java.util.UUID;
import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants;
import org.apache.rya.indexing.pcj.fluo.app.query.FluoQuery;
@@ -60,6 +61,13 @@ public class FluoQueryUtils {
}
/**
+ * @return - A new pcjId, which is a UUID with all dashes removed
+ */
+ public static String createNewPcjId() {
+ return UUID.randomUUID().toString().replaceAll("-", "");
+ }
+
+ /**
* Uses a {@link NodeIdCollector} visitor to do a pre-order traverse of the
* FluoQuery and gather the nodeIds of the metadata nodes.
* @param query - FluoQuery to be traversed
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/KafkaExportParametersTest.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/KafkaExportParametersTest.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/KafkaExportParametersTest.java
index b9c10d4..cd21ed6 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/KafkaExportParametersTest.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/KafkaExportParametersTest.java
@@ -27,12 +27,13 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
-import org.apache.rya.indexing.pcj.fluo.app.export.kafka.KafkaExportParameters;
+import org.apache.rya.indexing.pcj.fluo.app.export.kafka.KafkaExportParameterBase;
import org.apache.rya.indexing.pcj.fluo.app.export.kafka.KafkaBindingSetExporterFactory;
+import org.apache.rya.indexing.pcj.fluo.app.export.kafka.KafkaBindingSetExporterParameters;
import org.junit.Test;
/**
- * Tests the methods of {@link KafkaExportParameters}.
+ * Tests the methods of {@link KafkaExportParameterBase}.
*/
public class KafkaExportParametersTest {
@@ -41,19 +42,19 @@ public class KafkaExportParametersTest {
final Map<String, String> params = new HashMap<>();
// Load some values into the params using the wrapper.
- final KafkaExportParameters kafkaParams = new KafkaExportParameters(params);
- kafkaParams.setExportToKafka(true);
+ final KafkaBindingSetExporterParameters kafkaParams = new KafkaBindingSetExporterParameters(params);
+ kafkaParams.setUseKafkaBindingSetExporter(true);
// Ensure the params map has the expected values.
final Map<String, String> expectedParams = new HashMap<>();
- expectedParams.put(KafkaExportParameters.CONF_EXPORT_TO_KAFKA, "true");
- assertTrue(kafkaParams.isExportToKafka());
+ expectedParams.put(KafkaBindingSetExporterParameters.CONF_USE_KAFKA_BINDING_SET_EXPORTER, "true");
+ assertTrue(kafkaParams.getUseKafkaBindingSetExporter());
assertEquals(expectedParams, params);
// now go the other way.
- expectedParams.put(KafkaExportParameters.CONF_EXPORT_TO_KAFKA, "false");
- kafkaParams.setExportToKafka(false);
- assertFalse(kafkaParams.isExportToKafka());
+ expectedParams.put(KafkaBindingSetExporterParameters.CONF_USE_KAFKA_BINDING_SET_EXPORTER, "false");
+ kafkaParams.setUseKafkaBindingSetExporter(false);
+ assertFalse(kafkaParams.getUseKafkaBindingSetExporter());
assertEquals(expectedParams, params);
}
@Test
@@ -68,7 +69,7 @@ public class KafkaExportParametersTest {
// Make sure export key1 is NOT kept separate from producer config key1
// This is a change, originally they were kept separate.
params.put(key1, value1First);
- final KafkaExportParameters kafkaParams = new KafkaExportParameters(params);
+ final KafkaExportParameterBase kafkaParams = new KafkaExportParameterBase(params);
// Load some values into the properties using the wrapper.
Properties props = new Properties();
props.put(key1, value1Second);
@@ -87,8 +88,8 @@ public class KafkaExportParametersTest {
final Map<String, String> params = new HashMap<>();
// Ensure an unconfigured parameters map will say kafka export is disabled.
- final KafkaExportParameters kafkaParams = new KafkaExportParameters(params);
- assertFalse(kafkaParams.isExportToKafka());
+ final KafkaBindingSetExporterParameters kafkaParams = new KafkaBindingSetExporterParameters(params);
+ assertFalse(kafkaParams.getUseKafkaBindingSetExporter());
}
@Test
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaExportParametersTest.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaExportParametersTest.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaExportParametersTest.java
index 9ac5139..5653312 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaExportParametersTest.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaExportParametersTest.java
@@ -37,7 +37,7 @@ public class RyaExportParametersTest {
// Load some values into the params using the wrapper.
final RyaExportParameters ryaParams = new RyaExportParameters(params);
- ryaParams.setExportToRya(true);
+ ryaParams.setUseRyaBindingSetExporter(true);
ryaParams.setAccumuloInstanceName("demoAccumulo");
ryaParams.setZookeeperServers("zoo1;zoo2");
ryaParams.setExporterUsername("fluo");
@@ -45,7 +45,7 @@ public class RyaExportParametersTest {
// Ensure the params map has the expected values.
final Map<String, String> expectedParams = new HashMap<>();
- expectedParams.put(RyaExportParameters.CONF_EXPORT_TO_RYA, "true");
+ expectedParams.put(RyaExportParameters.CONF_USE_RYA_BINDING_SET_EXPORTER, "true");
expectedParams.put(RyaExportParameters.CONF_ACCUMULO_INSTANCE_NAME, "demoAccumulo");
expectedParams.put(RyaExportParameters.CONF_ZOOKEEPER_SERVERS, "zoo1;zoo2");
expectedParams.put(RyaExportParameters.CONF_EXPORTER_USERNAME, "fluo");
@@ -60,6 +60,6 @@ public class RyaExportParametersTest {
// Ensure an unconfigured parameters map will say rya export is disabled.
final RyaExportParameters ryaParams = new RyaExportParameters(params);
- assertFalse(ryaParams.isExportToRya());
+ assertFalse(ryaParams.getUseRyaBindingSetExporter());
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/PeriodicQueryUtilTest.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/PeriodicQueryUtilTest.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/PeriodicQueryUtilTest.java
index b40ba3f..55455a7 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/PeriodicQueryUtilTest.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/PeriodicQueryUtilTest.java
@@ -154,7 +154,7 @@ public class PeriodicQueryUtilTest {
}
@Test
- public void testFluoQueryVarOrders() throws MalformedQueryException {
+ public void testFluoQueryVarOrders() throws MalformedQueryException, UnsupportedQueryException {
String query = "prefix function: <http://org.apache.rya/function#> " //n
+ "prefix time: <http://www.w3.org/2006/time#> " //n
+ "select (count(?obs) as ?total) where {" //n
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/QueryMetadataVisitorTest.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/QueryMetadataVisitorTest.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/QueryMetadataVisitorTest.java
index 5c89a75..48f2f39 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/QueryMetadataVisitorTest.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/QueryMetadataVisitorTest.java
@@ -29,7 +29,7 @@ import org.junit.Test;
public class QueryMetadataVisitorTest {
@Test
- public void builderTest() {
+ public void builderTest() throws UnsupportedQueryException {
String query = "prefix function: <http://org.apache.rya/function#> " // n
+ "prefix time: <http://www.w3.org/2006/time#> " // n
+ "select ?id (count(?obs) as ?total) where {" // n
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/PcjAdminClient.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/PcjAdminClient.java b/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/PcjAdminClient.java
index 901f39d..cc74f6b 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/PcjAdminClient.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/PcjAdminClient.java
@@ -43,6 +43,7 @@ import org.apache.accumulo.core.client.ZooKeeperInstance;
import org.apache.accumulo.core.client.security.tokens.PasswordToken;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
+import org.apache.rya.indexing.pcj.fluo.app.query.UnsupportedQueryException;
import org.apache.rya.indexing.pcj.fluo.client.PcjAdminClientCommand.ArgumentsException;
import org.apache.rya.indexing.pcj.fluo.client.PcjAdminClientCommand.ExecutionException;
import org.apache.rya.indexing.pcj.fluo.client.command.CountUnprocessedStatementsCommand;
@@ -152,6 +153,9 @@ public class PcjAdminClient {
System.err.println("Could not execute the command.");
e.printStackTrace();
System.exit(-1);
+ } catch (UnsupportedQueryException e) {
+ System.err.println("Could not execute the command because the query is invalid.");
+ e.printStackTrace();
} finally {
log.trace("Shutting down the PCJ Admin Client.");
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/PcjAdminClientCommand.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/PcjAdminClientCommand.java b/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/PcjAdminClientCommand.java
index 2b3b105..a944b33 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/PcjAdminClientCommand.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/PcjAdminClientCommand.java
@@ -24,6 +24,7 @@ import edu.umd.cs.findbugs.annotations.NonNull;
import org.apache.accumulo.core.client.Connector;
import org.apache.fluo.api.client.FluoClient;
+import org.apache.rya.indexing.pcj.fluo.app.query.UnsupportedQueryException;
import org.apache.rya.rdftriplestore.RyaSailRepository;
/**
@@ -57,13 +58,14 @@ public interface PcjAdminClientCommand {
* @param rya - A connection to the Rya instance used to search for historic PCJ matches. (not null)
* @param client - A connection to the Fluo app that is updating the PCJs. (not null)
* @param args - Command line arguments that configure how the command will execute. (not null)
+ * @throws UnsupportedQueryException
*/
public void execute(
final Connector accumulo,
final String ryaTablePrefix,
final RyaSailRepository rya,
final FluoClient fluo,
- final String[] args) throws ArgumentsException, ExecutionException;
+ final String[] args) throws ArgumentsException, ExecutionException, UnsupportedQueryException;
/**
* A {@link PcjAdminClientCommand} could not be executed because of a problem with
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/command/NewQueryCommand.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/command/NewQueryCommand.java b/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/command/NewQueryCommand.java
index 3f335f4..78515d9 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/command/NewQueryCommand.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/command/NewQueryCommand.java
@@ -42,6 +42,7 @@ import org.apache.rya.accumulo.AccumuloRdfConfiguration;
import org.apache.rya.accumulo.query.AccumuloRyaQueryEngine;
import org.apache.rya.api.persist.RyaDAOException;
import org.apache.rya.indexing.pcj.fluo.api.CreateFluoPcj;
+import org.apache.rya.indexing.pcj.fluo.app.query.UnsupportedQueryException;
import org.apache.rya.indexing.pcj.fluo.client.PcjAdminClientCommand;
import org.apache.rya.indexing.pcj.fluo.client.util.ParsedQueryRequest;
import org.apache.rya.indexing.pcj.storage.PcjException;
@@ -94,7 +95,7 @@ public class NewQueryCommand implements PcjAdminClientCommand {
}
@Override
- public void execute(final Connector accumulo, final String ryaTablePrefix, final RyaSailRepository rya, final FluoClient fluo, final String[] args) throws ArgumentsException, ExecutionException {
+ public void execute(final Connector accumulo, final String ryaTablePrefix, final RyaSailRepository rya, final FluoClient fluo, final String[] args) throws ArgumentsException, ExecutionException, UnsupportedQueryException {
checkNotNull(accumulo);
checkNotNull(fluo);
checkNotNull(args);
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/command/QueryReportCommand.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/command/QueryReportCommand.java b/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/command/QueryReportCommand.java
index 675a844..2a7f787 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/command/QueryReportCommand.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/command/QueryReportCommand.java
@@ -26,6 +26,7 @@ import org.apache.logging.log4j.Logger;
import org.apache.rya.indexing.pcj.fluo.client.PcjAdminClientCommand;
import org.apache.rya.indexing.pcj.fluo.api.GetQueryReport;
import org.apache.rya.indexing.pcj.fluo.api.GetQueryReport.QueryReport;
+import org.apache.rya.indexing.pcj.fluo.app.query.UnsupportedQueryException;
import org.apache.rya.indexing.pcj.fluo.client.util.QueryReportRenderer;
import com.beust.jcommander.JCommander;
@@ -69,7 +70,7 @@ public class QueryReportCommand implements PcjAdminClientCommand {
}
@Override
- public void execute(final Connector accumulo, final String ryaTablePrefix, final RyaSailRepository rya, final FluoClient fluo, final String[] args) throws ArgumentsException, ExecutionException {
+ public void execute(final Connector accumulo, final String ryaTablePrefix, final RyaSailRepository rya, final FluoClient fluo, final String[] args) throws ArgumentsException, ExecutionException, UnsupportedQueryException {
checkNotNull(accumulo);
checkNotNull(ryaTablePrefix);
checkNotNull(rya);
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/util/QueryReportRenderer.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/util/QueryReportRenderer.java b/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/util/QueryReportRenderer.java
index f44db6c..d1b3e25 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/util/QueryReportRenderer.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/util/QueryReportRenderer.java
@@ -20,12 +20,9 @@ package org.apache.rya.indexing.pcj.fluo.client.util;
import static com.google.common.base.Preconditions.checkNotNull;
-import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
-import edu.umd.cs.findbugs.annotations.NonNull;
-
import org.apache.commons.lang3.StringUtils;
+import org.apache.rya.api.client.CreatePCJ.QueryType;
import org.apache.rya.indexing.pcj.fluo.api.GetQueryReport.QueryReport;
-import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.QueryType;
import org.apache.rya.indexing.pcj.fluo.app.query.ConstructQueryMetadata;
import org.apache.rya.indexing.pcj.fluo.app.query.FilterMetadata;
import org.apache.rya.indexing.pcj.fluo.app.query.FluoQuery;
@@ -38,6 +35,9 @@ import org.openrdf.query.parser.ParsedQuery;
import org.openrdf.query.parser.sparql.SPARQLParser;
import org.openrdf.queryrender.sparql.SPARQLQueryRenderer;
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
/**
* Pretty renders a {@link QueryReport}.
*/
@@ -70,7 +70,7 @@ public class QueryReportRenderer {
- if (metadata.getQueryType() == QueryType.Construct) {
+ if (metadata.getQueryType() == QueryType.CONSTRUCT) {
builder.appendItem( new ReportItem("") );
final ConstructQueryMetadata constructMetadata = metadata.getConstructQueryMetadata().get();
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.demo/src/main/java/org/apache/rya/indexing/pcj/fluo/demo/DemoDriver.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.demo/src/main/java/org/apache/rya/indexing/pcj/fluo/demo/DemoDriver.java b/extras/rya.pcj.fluo/pcj.fluo.demo/src/main/java/org/apache/rya/indexing/pcj/fluo/demo/DemoDriver.java
index e8f10b8..1ae02dd 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.demo/src/main/java/org/apache/rya/indexing/pcj/fluo/demo/DemoDriver.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.demo/src/main/java/org/apache/rya/indexing/pcj/fluo/demo/DemoDriver.java
@@ -304,7 +304,7 @@ public class DemoDriver {
// Provide export parameters child test classes may provide to the export observer.
final HashMap<String, String> exportParams = new HashMap<>();
final RyaExportParameters ryaParams = new RyaExportParameters(exportParams);
- ryaParams.setExportToRya(true);
+ ryaParams.setUseRyaBindingSetExporter(true);
ryaParams.setAccumuloInstanceName(accumulo.getInstanceName());
ryaParams.setZookeeperServers(accumulo.getZooKeepers());
ryaParams.setExporterUsername("root");
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.demo/src/main/java/org/apache/rya/indexing/pcj/fluo/demo/FluoAndHistoricPcjsDemo.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.demo/src/main/java/org/apache/rya/indexing/pcj/fluo/demo/FluoAndHistoricPcjsDemo.java b/extras/rya.pcj.fluo/pcj.fluo.demo/src/main/java/org/apache/rya/indexing/pcj/fluo/demo/FluoAndHistoricPcjsDemo.java
index f25b573..4070849 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.demo/src/main/java/org/apache/rya/indexing/pcj/fluo/demo/FluoAndHistoricPcjsDemo.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.demo/src/main/java/org/apache/rya/indexing/pcj/fluo/demo/FluoAndHistoricPcjsDemo.java
@@ -18,7 +18,6 @@
*/
package org.apache.rya.indexing.pcj.fluo.demo;
-import java.io.IOException;
import java.util.Set;
import org.apache.accumulo.core.client.Connector;
@@ -35,22 +34,20 @@ import org.apache.rya.api.persist.RyaDAOException;
import org.apache.rya.api.resolver.RyaToRdfConversions;
import org.apache.rya.indexing.pcj.fluo.api.CreateFluoPcj;
import org.apache.rya.indexing.pcj.fluo.api.InsertTriples;
+import org.apache.rya.indexing.pcj.fluo.app.query.UnsupportedQueryException;
import org.apache.rya.indexing.pcj.storage.PcjException;
import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator;
-import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.PCJStorageException;
import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage;
import org.apache.rya.rdftriplestore.RyaSailRepository;
import org.openrdf.model.Statement;
import org.openrdf.query.BindingSet;
import org.openrdf.query.MalformedQueryException;
-import org.openrdf.query.QueryEvaluationException;
import org.openrdf.query.parser.ParsedQuery;
import org.openrdf.query.parser.sparql.SPARQLParser;
import org.openrdf.queryrender.sparql.SPARQLQueryRenderer;
import org.openrdf.repository.RepositoryConnection;
import org.openrdf.repository.RepositoryException;
-import org.openrdf.sail.SailException;
import com.google.common.base.Optional;
import com.google.common.collect.Sets;
@@ -181,7 +178,7 @@ public class FluoAndHistoricPcjsDemo implements Demo {
// Tell the Fluo app to maintain it.
new CreateFluoPcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, accumuloConn, ryaTablePrefix);
- } catch (MalformedQueryException | PcjException | RyaDAOException e) {
+ } catch (MalformedQueryException | PcjException | RyaDAOException | UnsupportedQueryException e) {
throw new DemoExecutionException("Error while using Fluo to compute and export historic matches, so the demo can not continue. Exiting.", e);
}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetPcjMetadataIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetPcjMetadataIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetPcjMetadataIT.java
index 263a19e..7676657 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetPcjMetadataIT.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetPcjMetadataIT.java
@@ -32,6 +32,7 @@ import org.apache.fluo.api.client.FluoFactory;
import org.apache.rya.api.persist.RyaDAOException;
import org.apache.rya.indexing.pcj.fluo.api.GetPcjMetadata.NotInAccumuloException;
import org.apache.rya.indexing.pcj.fluo.api.GetPcjMetadata.NotInFluoException;
+import org.apache.rya.indexing.pcj.fluo.app.query.UnsupportedQueryException;
import org.apache.rya.indexing.pcj.storage.PcjException;
import org.apache.rya.indexing.pcj.storage.PcjMetadata;
import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
@@ -53,7 +54,7 @@ import com.google.common.collect.Sets;
public class GetPcjMetadataIT extends RyaExportITBase {
@Test
- public void getMetadataByQueryId() throws RepositoryException, MalformedQueryException, SailException, QueryEvaluationException, PcjException, NotInFluoException, NotInAccumuloException, RyaDAOException {
+ public void getMetadataByQueryId() throws RepositoryException, MalformedQueryException, SailException, QueryEvaluationException, PcjException, NotInFluoException, NotInAccumuloException, RyaDAOException, UnsupportedQueryException {
final String sparql =
"SELECT ?x " +
"WHERE { " +
@@ -82,7 +83,7 @@ public class GetPcjMetadataIT extends RyaExportITBase {
}
@Test
- public void getAllMetadata() throws MalformedQueryException, SailException, QueryEvaluationException, PcjException, NotInFluoException, NotInAccumuloException, AccumuloException, AccumuloSecurityException, RyaDAOException {
+ public void getAllMetadata() throws MalformedQueryException, SailException, QueryEvaluationException, PcjException, NotInFluoException, NotInAccumuloException, AccumuloException, AccumuloSecurityException, RyaDAOException, UnsupportedQueryException {
final Connector accumuloConn = super.getAccumuloConnector();
final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(accumuloConn, getRyaInstanceName());
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/ListQueryIdsIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/ListQueryIdsIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/ListQueryIdsIT.java
index e3914bd..3310690 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/ListQueryIdsIT.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/ListQueryIdsIT.java
@@ -18,7 +18,7 @@
*/
package org.apache.rya.indexing.pcj.fluo.api;
-import static org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns.QUERY_ID;
+import static org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns.QUERY_NODE_ID;
import static org.junit.Assert.assertEquals;
import java.util.List;
@@ -49,10 +49,10 @@ public class ListQueryIdsIT extends RyaExportITBase {
try(FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) {
// Store a few SPARQL/Query ID pairs in the Fluo table.
try(Transaction tx = fluoClient.newTransaction()) {
- tx.set("SPARQL_3", QUERY_ID, "ID_3");
- tx.set("SPARQL_1", QUERY_ID, "ID_1");
- tx.set("SPARQL_4", QUERY_ID, "ID_4");
- tx.set("SPARQL_2", QUERY_ID, "ID_2");
+ tx.set("SPARQL_3", QUERY_NODE_ID, "ID_3");
+ tx.set("SPARQL_1", QUERY_NODE_ID, "ID_1");
+ tx.set("SPARQL_4", QUERY_NODE_ID, "ID_4");
+ tx.set("SPARQL_2", QUERY_NODE_ID, "ID_2");
tx.commit();
}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAOIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAOIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAOIT.java
index 315dddb..45492de 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAOIT.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAOIT.java
@@ -29,9 +29,9 @@ import org.apache.fluo.api.client.FluoClient;
import org.apache.fluo.api.client.FluoFactory;
import org.apache.fluo.api.client.Snapshot;
import org.apache.fluo.api.client.Transaction;
+import org.apache.rya.api.client.CreatePCJ.ExportStrategy;
+import org.apache.rya.api.client.CreatePCJ.QueryType;
import org.apache.rya.indexing.pcj.fluo.app.ConstructGraph;
-import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.ExportStrategy;
-import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.QueryType;
import org.apache.rya.indexing.pcj.fluo.app.NodeType;
import org.apache.rya.indexing.pcj.fluo.app.query.AggregationMetadata.AggregationElement;
import org.apache.rya.indexing.pcj.fluo.app.query.AggregationMetadata.AggregationType;
@@ -148,11 +148,11 @@ public class FluoQueryMetadataDAOIT extends RyaExportITBase {
// Create the object that will be serialized.
String queryId = NodeType.generateNewFluoIdForType(NodeType.QUERY);
final QueryMetadata.Builder builder = QueryMetadata.builder(queryId);
- builder.setQueryType(QueryType.Projection);
+ builder.setQueryType(QueryType.PROJECTION);
builder.setVarOrder(new VariableOrder("y;s;d"));
builder.setSparql("sparql string");
builder.setChildNodeId("childNodeId");
- builder.setExportStrategies(new HashSet<>(Arrays.asList(ExportStrategy.Kafka)));
+ builder.setExportStrategies(new HashSet<>(Arrays.asList(ExportStrategy.KAFKA)));
final QueryMetadata originalMetadata = builder.build();
try(FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) {
@@ -338,7 +338,7 @@ public class FluoQueryMetadataDAOIT extends RyaExportITBase {
}
@Test
- public void fluoQueryTest() throws MalformedQueryException {
+ public void fluoQueryTest() throws MalformedQueryException, UnsupportedQueryException {
final FluoQueryMetadataDAO dao = new FluoQueryMetadataDAO();
// Create the object that will be serialized.
@@ -357,7 +357,7 @@ public class FluoQueryMetadataDAOIT extends RyaExportITBase {
builder.setFluoQueryId(NodeType.generateNewFluoIdForType(NodeType.QUERY));
final FluoQuery originalQuery = builder.build();
- assertEquals(QueryType.Projection, originalQuery.getQueryType());
+ assertEquals(QueryType.PROJECTION, originalQuery.getQueryType());
assertEquals(false, originalQuery.getConstructQueryMetadata().isPresent());
try(FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) {
@@ -379,7 +379,7 @@ public class FluoQueryMetadataDAOIT extends RyaExportITBase {
}
@Test
- public void fluoConstructQueryTest() throws MalformedQueryException {
+ public void fluoConstructQueryTest() throws MalformedQueryException, UnsupportedQueryException {
final FluoQueryMetadataDAO dao = new FluoQueryMetadataDAO();
// Create the object that will be serialized.
@@ -398,7 +398,7 @@ public class FluoQueryMetadataDAOIT extends RyaExportITBase {
builder.setFluoQueryId(NodeType.generateNewFluoIdForType(NodeType.QUERY));
final FluoQuery originalQuery = builder.build();
- assertEquals(QueryType.Construct, originalQuery.getQueryType());
+ assertEquals(QueryType.CONSTRUCT, originalQuery.getQueryType());
assertEquals(true, originalQuery.getConstructQueryMetadata().isPresent());
try(FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) {
@@ -421,7 +421,7 @@ public class FluoQueryMetadataDAOIT extends RyaExportITBase {
@Test
- public void fluoNestedQueryTest() throws MalformedQueryException {
+ public void fluoNestedQueryTest() throws MalformedQueryException, UnsupportedQueryException {
final FluoQueryMetadataDAO dao = new FluoQueryMetadataDAO();
// Create the object that will be serialized.
@@ -442,7 +442,7 @@ public class FluoQueryMetadataDAOIT extends RyaExportITBase {
builder.setFluoQueryId(NodeType.generateNewFluoIdForType(NodeType.QUERY));
final FluoQuery originalQuery = builder.build();
- assertEquals(QueryType.Projection, originalQuery.getQueryType());
+ assertEquals(QueryType.PROJECTION, originalQuery.getQueryType());
try(FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) {
// Write it to the Fluo table.
@@ -463,7 +463,7 @@ public class FluoQueryMetadataDAOIT extends RyaExportITBase {
}
@Test
- public void fluoNestedConstructQueryTest() throws MalformedQueryException {
+ public void fluoNestedConstructQueryTest() throws MalformedQueryException, UnsupportedQueryException {
final FluoQueryMetadataDAO dao = new FluoQueryMetadataDAO();
// Create the object that will be serialized.
@@ -488,7 +488,7 @@ public class FluoQueryMetadataDAOIT extends RyaExportITBase {
builder.setFluoQueryId(NodeType.generateNewFluoIdForType(NodeType.QUERY));
final FluoQuery originalQuery = builder.build();
- assertEquals(QueryType.Construct, originalQuery.getQueryType());
+ assertEquals(QueryType.CONSTRUCT, originalQuery.getQueryType());
try(FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) {
// Write it to the Fluo table.
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/BatchIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/BatchIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/BatchIT.java
index 32d0e41..47a2f29 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/BatchIT.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/BatchIT.java
@@ -53,6 +53,7 @@ import org.apache.rya.indexing.pcj.fluo.app.query.FluoQuery;
import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO;
import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata.JoinType;
+import org.apache.rya.indexing.pcj.fluo.app.query.UnsupportedQueryException;
import org.apache.rya.indexing.pcj.fluo.app.util.FluoQueryUtils;
import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage;
@@ -343,7 +344,7 @@ public class BatchIT extends RyaExportITBase {
return statements;
}
- private List<String> getNodeIdStrings(FluoClient fluoClient, String queryId) {
+ private List<String> getNodeIdStrings(FluoClient fluoClient, String queryId) throws UnsupportedQueryException {
List<String> nodeStrings;
try (Snapshot sx = fluoClient.newSnapshot()) {
FluoQuery query = dao.readFluoQuery(sx, queryId);
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeleteIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeleteIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeleteIT.java
index 7c4caa4..a1d76cb 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeleteIT.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeleteIT.java
@@ -33,6 +33,7 @@ import org.apache.fluo.api.client.scanner.ColumnScanner;
import org.apache.fluo.api.client.scanner.RowScanner;
import org.apache.fluo.api.data.Bytes;
import org.apache.fluo.api.data.Span;
+import org.apache.rya.api.client.CreatePCJ.ExportStrategy;
import org.apache.rya.api.client.RyaClient;
import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory;
import org.apache.rya.indexing.pcj.fluo.api.DeleteFluoPcj;
@@ -79,7 +80,7 @@ public class CreateDeleteIT extends RyaExportITBase {
try(FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) {
// Ensure the data was loaded.
final List<Bytes> rows = getFluoTableEntries(fluoClient);
- assertEquals(20, rows.size());
+ assertEquals(18, rows.size());
// Delete the PCJ from the Fluo application.
new DeleteFluoPcj(1).deletePcj(fluoClient, pcjId);
@@ -111,7 +112,7 @@ public class CreateDeleteIT extends RyaExportITBase {
try(FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) {
// Ensure the data was loaded.
final List<Bytes> rows = getFluoTableEntries(fluoClient);
- assertEquals(12, rows.size());
+ assertEquals(10, rows.size());
// Delete the PCJ from the Fluo application.
new DeleteFluoPcj(1).deletePcj(fluoClient, pcjId);
@@ -130,7 +131,7 @@ public class CreateDeleteIT extends RyaExportITBase {
// Register the PCJ with Rya.
final RyaClient ryaClient = AccumuloRyaClientFactory.build(createConnectionDetails(), getAccumuloConnector());
- final String pcjId = ryaClient.getCreatePCJ().createPCJ(getRyaInstanceName(), sparql);
+ final String pcjId = ryaClient.getCreatePCJ().createPCJ(getRyaInstanceName(), sparql, Sets.newHashSet(ExportStrategy.NO_OP_EXPORT));
// Write the data to Rya.
final SailRepositoryConnection ryaConn = super.getRyaSailRepository().getConnection();
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/KafkaExportIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/KafkaExportIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/KafkaExportIT.java
index f9f55d0..8911f56 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/KafkaExportIT.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/KafkaExportIT.java
@@ -92,6 +92,8 @@ public class KafkaExportIT extends KafkaExportITBase {
// Create the PCJ in Fluo and load the statements into Rya.
final String pcjId = loadData(sparql, statements);
+ FluoITHelper.printFluoTable(super.getFluoConfiguration());
+
// The expected results of the SPARQL query once the PCJ has been computed.
final Set<BindingSet> expectedResult = new HashSet<>();
@@ -590,9 +592,9 @@ public class KafkaExportIT extends KafkaExportITBase {
// Read all of the results from the Kafka topic.
final Set<VisibilityBindingSet> results = new HashSet<>();
- try(final KafkaConsumer<Integer, VisibilityBindingSet> consumer = makeConsumer(pcjId)) {
- final ConsumerRecords<Integer, VisibilityBindingSet> records = consumer.poll(5000);
- final Iterator<ConsumerRecord<Integer, VisibilityBindingSet>> recordIterator = records.iterator();
+ try(final KafkaConsumer<String, VisibilityBindingSet> consumer = makeConsumer(pcjId)) {
+ final ConsumerRecords<String, VisibilityBindingSet> records = consumer.poll(5000);
+ final Iterator<ConsumerRecord<String, VisibilityBindingSet>> recordIterator = records.iterator();
while (recordIterator.hasNext()) {
results.add( recordIterator.next().value() );
}
@@ -607,9 +609,9 @@ public class KafkaExportIT extends KafkaExportITBase {
// Read the results from the Kafka topic. The last one has the final aggregation result.
VisibilityBindingSet result = null;
- try(final KafkaConsumer<Integer, VisibilityBindingSet> consumer = makeConsumer(pcjId)) {
- final ConsumerRecords<Integer, VisibilityBindingSet> records = consumer.poll(5000);
- final Iterator<ConsumerRecord<Integer, VisibilityBindingSet>> recordIterator = records.iterator();
+ try(final KafkaConsumer<String, VisibilityBindingSet> consumer = makeConsumer(pcjId)) {
+ final ConsumerRecords<String, VisibilityBindingSet> records = consumer.poll(5000);
+ final Iterator<ConsumerRecord<String, VisibilityBindingSet>> recordIterator = records.iterator();
while (recordIterator.hasNext()) {
result = recordIterator.next().value();
}
@@ -625,9 +627,9 @@ public class KafkaExportIT extends KafkaExportITBase {
// The key in this map is a Binding Set containing only the group by variables.
final Map<BindingSet, VisibilityBindingSet> results = new HashMap<>();
- try(final KafkaConsumer<Integer, VisibilityBindingSet> consumer = makeConsumer(pcjId)) {
- final ConsumerRecords<Integer, VisibilityBindingSet> records = consumer.poll(5000);
- final Iterator<ConsumerRecord<Integer, VisibilityBindingSet>> recordIterator = records.iterator();
+ try(final KafkaConsumer<String, VisibilityBindingSet> consumer = makeConsumer(pcjId)) {
+ final ConsumerRecords<String, VisibilityBindingSet> records = consumer.poll(5000);
+ final Iterator<ConsumerRecord<String, VisibilityBindingSet>> recordIterator = records.iterator();
while (recordIterator.hasNext()) {
final VisibilityBindingSet visBindingSet = recordIterator.next().value();
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/KafkaRyaSubGraphExportIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/KafkaRyaSubGraphExportIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/KafkaRyaSubGraphExportIT.java
index ca8de0d..b2944ca 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/KafkaRyaSubGraphExportIT.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/KafkaRyaSubGraphExportIT.java
@@ -33,13 +33,12 @@ import java.util.stream.Collectors;
import org.apache.fluo.api.client.FluoClient;
import org.apache.fluo.api.config.ObserverSpecification;
import org.apache.fluo.core.client.FluoClientImpl;
+import org.apache.fluo.recipes.test.FluoITHelper;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.rya.accumulo.AccumuloRyaDAO;
import org.apache.rya.api.domain.RyaStatement;
import org.apache.rya.api.domain.RyaSubGraph;
@@ -48,13 +47,14 @@ import org.apache.rya.api.domain.RyaURI;
import org.apache.rya.api.resolver.RdfToRyaConversions;
import org.apache.rya.indexing.pcj.fluo.ConstructGraphTestUtils;
import org.apache.rya.indexing.pcj.fluo.api.CreateFluoPcj;
-import org.apache.rya.indexing.pcj.fluo.app.export.kafka.KafkaExportParameters;
+import org.apache.rya.indexing.pcj.fluo.app.export.kafka.KafkaSubGraphExporterParameters;
import org.apache.rya.indexing.pcj.fluo.app.export.kafka.RyaSubGraphKafkaSerDe;
import org.apache.rya.indexing.pcj.fluo.app.observers.AggregationObserver;
import org.apache.rya.indexing.pcj.fluo.app.observers.ConstructQueryResultObserver;
import org.apache.rya.indexing.pcj.fluo.app.observers.FilterObserver;
import org.apache.rya.indexing.pcj.fluo.app.observers.JoinObserver;
import org.apache.rya.indexing.pcj.fluo.app.observers.ProjectionObserver;
+import org.apache.rya.indexing.pcj.fluo.app.observers.QueryResultObserver;
import org.apache.rya.indexing.pcj.fluo.app.observers.StatementPatternObserver;
import org.apache.rya.indexing.pcj.fluo.app.observers.TripleObserver;
import org.apache.rya.pcj.fluo.test.base.KafkaExportITBase;
@@ -88,22 +88,18 @@ public class KafkaRyaSubGraphExportIT extends KafkaExportITBase {
observers.add(new ObserverSpecification(FilterObserver.class.getName()));
observers.add(new ObserverSpecification(AggregationObserver.class.getName()));
observers.add(new ObserverSpecification(ProjectionObserver.class.getName()));
+ observers.add(new ObserverSpecification(ConstructQueryResultObserver.class.getName()));
+
// Configure the export observer to export new PCJ results to the mini
// accumulo cluster.
final HashMap<String, String> exportParams = new HashMap<>();
- final KafkaExportParameters kafkaParams = new KafkaExportParameters(exportParams);
- kafkaParams.setExportToKafka(true);
-
- // Configure the Kafka Producer
- final Properties producerConfig = new Properties();
- producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKERHOST + ":" + BROKERPORT);
- producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
- producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, RyaSubGraphKafkaSerDe.class.getName());
- kafkaParams.addAllProducerConfig(producerConfig);
+ final KafkaSubGraphExporterParameters kafkaParams = new KafkaSubGraphExporterParameters(exportParams);
+ kafkaParams.setUseKafkaSubgraphExporter(true);
+ kafkaParams.setKafkaBootStrapServers(BROKERHOST + ":" + BROKERPORT);
- final ObserverSpecification exportObserverConfig = new ObserverSpecification(ConstructQueryResultObserver.class.getName(),
+ final ObserverSpecification exportObserverConfig = new ObserverSpecification(QueryResultObserver.class.getName(),
exportParams);
observers.add(exportObserverConfig);
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/QueryIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/QueryIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/QueryIT.java
index 6ecec02..0aefaca 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/QueryIT.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/QueryIT.java
@@ -34,9 +34,11 @@ import javax.xml.datatype.DatatypeFactory;
import org.apache.accumulo.core.client.Connector;
import org.apache.fluo.api.client.FluoClient;
import org.apache.fluo.core.client.FluoClientImpl;
+import org.apache.rya.api.client.CreatePCJ.ExportStrategy;
import org.apache.rya.api.client.RyaClient;
import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory;
import org.apache.rya.indexing.pcj.fluo.api.CreateFluoPcj;
+import org.apache.rya.indexing.pcj.fluo.app.query.UnsupportedQueryException;
import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage;
import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator;
@@ -877,7 +879,7 @@ public class QueryIT extends RyaExportITBase {
runTest(query, statements, expectedResults, ExporterType.Periodic);
}
- @Test(expected= IllegalArgumentException.class)
+ @Test(expected= UnsupportedQueryException.class)
public void nestedConstructPeriodicQueryWithAggregationAndGroupBy() throws Exception {
String query = "prefix function: <http://org.apache.rya/function#> " // n
+ "prefix time: <http://www.w3.org/2006/time#> " // n
@@ -924,7 +926,7 @@ public class QueryIT extends RyaExportITBase {
PeriodicQueryResultStorage periodicStorage = new AccumuloPeriodicQueryResultStorage(accumuloConn, getRyaInstanceName());
String periodicId = periodicStorage.createPeriodicQuery(sparql);
try (FluoClient fluo = new FluoClientImpl(super.getFluoConfiguration())) {
- new CreateFluoPcj().createPcj(periodicId, sparql, fluo);
+ new CreateFluoPcj().createPcj(periodicId, sparql, Sets.newHashSet(ExportStrategy.RYA), fluo);
}
addStatementsAndWait(statements);
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/pcj/fluo/test/base/KafkaExportITBase.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/pcj/fluo/test/base/KafkaExportITBase.java b/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/pcj/fluo/test/base/KafkaExportITBase.java
index c828a20..ed9ce60 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/pcj/fluo/test/base/KafkaExportITBase.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/pcj/fluo/test/base/KafkaExportITBase.java
@@ -37,25 +37,25 @@ import org.apache.accumulo.core.client.Instance;
import org.apache.accumulo.minicluster.MiniAccumuloCluster;
import org.apache.fluo.api.config.ObserverSpecification;
import org.apache.fluo.recipes.test.AccumuloExportITBase;
-import org.apache.fluo.recipes.test.FluoITHelper;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.rya.accumulo.AccumuloRdfConfiguration;
import org.apache.rya.accumulo.AccumuloRyaDAO;
+import org.apache.rya.api.client.CreatePCJ.ExportStrategy;
import org.apache.rya.api.client.Install.InstallConfiguration;
import org.apache.rya.api.client.RyaClient;
import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails;
import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory;
import org.apache.rya.indexing.accumulo.ConfigUtils;
import org.apache.rya.indexing.external.PrecomputedJoinIndexerConfig;
-import org.apache.rya.indexing.pcj.fluo.app.export.kafka.KafkaExportParameters;
-import org.apache.rya.indexing.pcj.fluo.app.export.kafka.RyaSubGraphKafkaSerDe;
+import org.apache.rya.indexing.pcj.fluo.app.export.kafka.KafkaBindingSetExporterParameters;
+import org.apache.rya.indexing.pcj.fluo.app.export.kafka.KafkaSubGraphExporterParameters;
+import org.apache.rya.indexing.pcj.fluo.app.export.kafka.KryoVisibilityBindingSetSerializer;
import org.apache.rya.indexing.pcj.fluo.app.observers.AggregationObserver;
import org.apache.rya.indexing.pcj.fluo.app.observers.ConstructQueryResultObserver;
import org.apache.rya.indexing.pcj.fluo.app.observers.FilterObserver;
@@ -74,6 +74,8 @@ import org.openrdf.model.Statement;
import org.openrdf.repository.sail.SailRepositoryConnection;
import org.openrdf.sail.Sail;
+import com.google.common.collect.Sets;
+
import kafka.admin.AdminUtils;
import kafka.admin.RackAwareMode;
import kafka.server.KafkaConfig;
@@ -119,41 +121,20 @@ public class KafkaExportITBase extends AccumuloExportITBase {
observers.add(new ObserverSpecification(FilterObserver.class.getName()));
observers.add(new ObserverSpecification(AggregationObserver.class.getName()));
observers.add(new ObserverSpecification(ProjectionObserver.class.getName()));
+ observers.add(new ObserverSpecification(ConstructQueryResultObserver.class.getName()));
// Configure the export observer to export new PCJ results to the mini
// accumulo cluster.
final HashMap<String, String> exportParams = new HashMap<>();
-
- final KafkaExportParameters kafkaParams = new KafkaExportParameters(exportParams);
- kafkaParams.setExportToKafka(true);
-
- // Configure the Kafka Producer
- final Properties producerConfig = new Properties();
- producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKERHOST + ":" + BROKERPORT);
- producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
- producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
- "org.apache.rya.indexing.pcj.fluo.app.export.kafka.KryoVisibilityBindingSetSerializer");
- kafkaParams.addAllProducerConfig(producerConfig);
+ final KafkaBindingSetExporterParameters kafkaParams = new KafkaBindingSetExporterParameters(exportParams);
+ kafkaParams.setUseKafkaBindingSetExporter(true);
+ kafkaParams.setKafkaBootStrapServers(BROKERHOST + ":" + BROKERPORT);
+
+ final KafkaSubGraphExporterParameters kafkaConstructParams = new KafkaSubGraphExporterParameters(exportParams);
+ kafkaConstructParams.setUseKafkaSubgraphExporter(true);
final ObserverSpecification exportObserverConfig = new ObserverSpecification(QueryResultObserver.class.getName(), exportParams);
observers.add(exportObserverConfig);
-
- //create construct query observer and tell it not to export to Kafka
- //it will only add results back into Fluo
- HashMap<String, String> constructParams = new HashMap<>();
- final KafkaExportParameters kafkaConstructParams = new KafkaExportParameters(constructParams);
- kafkaConstructParams.setExportToKafka(true);
-
- // Configure the Kafka Producer
- final Properties constructProducerConfig = new Properties();
- constructProducerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKERHOST + ":" + BROKERPORT);
- constructProducerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
- constructProducerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, RyaSubGraphKafkaSerDe.class.getName());
- kafkaConstructParams.addAllProducerConfig(constructProducerConfig);
-
- final ObserverSpecification constructExportObserverConfig = new ObserverSpecification(ConstructQueryResultObserver.class.getName(),
- constructParams);
- observers.add(constructExportObserverConfig);
// Add the observers to the Fluo Configuration.
super.getFluoConfiguration().addObservers(observers);
@@ -323,21 +304,19 @@ public class KafkaExportITBase extends AccumuloExportITBase {
consumer.close();
}
- protected KafkaConsumer<Integer, VisibilityBindingSet> makeConsumer(final String TopicName) {
+ protected KafkaConsumer<String, VisibilityBindingSet> makeConsumer(final String TopicName) {
// setup consumer
final Properties consumerProps = new Properties();
consumerProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKERHOST + ":" + BROKERPORT);
consumerProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group0");
consumerProps.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "consumer0");
- consumerProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
- "org.apache.kafka.common.serialization.IntegerDeserializer");
- consumerProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
- "org.apache.rya.indexing.pcj.fluo.app.export.kafka.KryoVisibilityBindingSetSerializer");
+ consumerProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+ consumerProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KryoVisibilityBindingSetSerializer.class.getName());
// to make sure the consumer starts from the beginning of the topic
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
- final KafkaConsumer<Integer, VisibilityBindingSet> consumer = new KafkaConsumer<>(consumerProps);
+ final KafkaConsumer<String, VisibilityBindingSet> consumer = new KafkaConsumer<>(consumerProps);
consumer.subscribe(Arrays.asList(TopicName));
return consumer;
}
@@ -353,7 +332,7 @@ public class KafkaExportITBase extends AccumuloExportITBase {
final RyaClient ryaClient = AccumuloRyaClientFactory.build(new AccumuloConnectionDetails(ACCUMULO_USER,
ACCUMULO_PASSWORD.toCharArray(), accInstance.getInstanceName(), accInstance.getZooKeepers()), accumuloConn);
- final String pcjId = ryaClient.getCreatePCJ().createPCJ(RYA_INSTANCE_NAME, sparql);
+ final String pcjId = ryaClient.getCreatePCJ().createPCJ(RYA_INSTANCE_NAME, sparql, Sets.newHashSet(ExportStrategy.KAFKA));
// Write the data to Rya.
final SailRepositoryConnection ryaConn = getRyaSailRepository().getConnection();
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/pcj/fluo/test/base/RyaExportITBase.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/pcj/fluo/test/base/RyaExportITBase.java b/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/pcj/fluo/test/base/RyaExportITBase.java
index 9c5732f..1c02db3 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/pcj/fluo/test/base/RyaExportITBase.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/pcj/fluo/test/base/RyaExportITBase.java
@@ -65,7 +65,8 @@ public class RyaExportITBase extends FluoITBase {
// Configure the export observer to export new PCJ results to the mini accumulo cluster.
final HashMap<String, String> exportParams = new HashMap<>();
final RyaExportParameters ryaParams = new RyaExportParameters(exportParams);
- ryaParams.setExportToRya(true);
+ ryaParams.setUseRyaBindingSetExporter(true);
+ ryaParams.setUsePeriodicBindingSetExporter(true);
ryaParams.setRyaInstanceName(getRyaInstanceName());
ryaParams.setAccumuloInstanceName(super.getMiniAccumuloCluster().getInstanceName());
ryaParams.setZookeeperServers(super.getMiniAccumuloCluster().getZooKeepers());
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/application/PeriodicNotificationProviderIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/application/PeriodicNotificationProviderIT.java b/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/application/PeriodicNotificationProviderIT.java
index 4d1bc75..cf24974 100644
--- a/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/application/PeriodicNotificationProviderIT.java
+++ b/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/application/PeriodicNotificationProviderIT.java
@@ -26,6 +26,7 @@ import org.apache.fluo.api.client.FluoClient;
import org.apache.fluo.core.client.FluoClientImpl;
import org.apache.fluo.recipes.test.AccumuloExportITBase;
import org.apache.rya.indexing.pcj.fluo.api.CreateFluoPcj;
+import org.apache.rya.indexing.pcj.fluo.app.query.UnsupportedQueryException;
import org.apache.rya.indexing.pcj.fluo.app.util.FluoQueryUtils;
import org.apache.rya.periodic.notification.coordinator.PeriodicNotificationCoordinatorExecutor;
import org.apache.rya.periodic.notification.notification.TimestampedNotification;
@@ -38,7 +39,7 @@ import org.junit.Assert;
public class PeriodicNotificationProviderIT extends AccumuloExportITBase {
@Test
- public void testProvider() throws MalformedQueryException, InterruptedException {
+ public void testProvider() throws MalformedQueryException, InterruptedException, UnsupportedQueryException {
String sparql = "prefix function: <http://org.apache.rya/function#> " // n
+ "prefix time: <http://www.w3.org/2006/time#> " // n
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/pruner/PeriodicNotificationBinPrunerIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/pruner/PeriodicNotificationBinPrunerIT.java b/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/pruner/PeriodicNotificationBinPrunerIT.java
index 27acc9c..bb98b7f 100644
--- a/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/pruner/PeriodicNotificationBinPrunerIT.java
+++ b/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/pruner/PeriodicNotificationBinPrunerIT.java
@@ -38,12 +38,10 @@ import org.apache.fluo.api.data.Bytes;
import org.apache.fluo.api.data.ColumnValue;
import org.apache.fluo.api.data.Span;
import org.apache.fluo.core.client.FluoClientImpl;
-import org.apache.fluo.recipes.test.FluoITHelper;
import org.apache.rya.api.resolver.RdfToRyaConversions;
import org.apache.rya.indexing.pcj.fluo.api.InsertTriples;
import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants;
import org.apache.rya.indexing.pcj.fluo.app.NodeType;
-import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
import org.apache.rya.indexing.pcj.fluo.app.util.PeriodicQueryUtil;
import org.apache.rya.indexing.pcj.fluo.app.util.RowKeyUtil;
import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage;
@@ -252,14 +250,14 @@ public class PeriodicNotificationBinPrunerIT extends RyaExportITBase {
}
}
- private void compareFluoCounts(FluoClient client, String queryId, long bin) {
+ private void compareFluoCounts(FluoClient client, String pcjId, long bin) {
QueryBindingSet bs = new QueryBindingSet();
bs.addBinding(IncrementalUpdateConstants.PERIODIC_BIN_ID, new LiteralImpl(Long.toString(bin), XMLSchema.LONG));
VariableOrder varOrder = new VariableOrder(IncrementalUpdateConstants.PERIODIC_BIN_ID);
try(Snapshot sx = client.newSnapshot()) {
- String fluoQueryId = sx.get(Bytes.of(queryId), FluoQueryColumns.PCJ_ID_QUERY_ID).toString();
+ String fluoQueryId = NodeType.generateNewIdForType(NodeType.QUERY, pcjId);
Set<String> ids = new HashSet<>();
PeriodicQueryUtil.getPeriodicQueryNodeAncestorIds(sx, fluoQueryId, ids);
for(String id: ids) {
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/CreatePeriodicQuery.java
----------------------------------------------------------------------
diff --git a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/CreatePeriodicQuery.java b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/CreatePeriodicQuery.java
index 6aade52..60a3e7c 100644
--- a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/CreatePeriodicQuery.java
+++ b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/CreatePeriodicQuery.java
@@ -21,8 +21,10 @@ package org.apache.rya.periodic.notification.api;
import java.util.Optional;
import org.apache.fluo.api.client.FluoClient;
+import org.apache.rya.api.client.CreatePCJ.ExportStrategy;
import org.apache.rya.indexing.pcj.fluo.api.CreateFluoPcj;
import org.apache.rya.indexing.pcj.fluo.app.query.PeriodicQueryNode;
+import org.apache.rya.indexing.pcj.fluo.app.query.UnsupportedQueryException;
import org.apache.rya.indexing.pcj.fluo.app.util.FluoQueryUtils;
import org.apache.rya.indexing.pcj.fluo.app.util.PeriodicQueryUtil;
import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage;
@@ -32,7 +34,7 @@ import org.apache.rya.periodic.notification.notification.PeriodicNotification;
import org.openrdf.query.MalformedQueryException;
import org.openrdf.query.algebra.evaluation.function.Function;
-import com.google.common.base.Preconditions;
+import com.google.common.collect.Sets;
/**
* Object that creates a Periodic Query. A Periodic Query is any query
@@ -82,17 +84,22 @@ public class CreatePeriodicQuery {
Optional<PeriodicQueryNode> optNode = PeriodicQueryUtil.getPeriodicNode(sparql);
if(optNode.isPresent()) {
PeriodicQueryNode periodicNode = optNode.get();
+ String pcjId = FluoQueryUtils.createNewPcjId();
+
+ //register query with Fluo
CreateFluoPcj createPcj = new CreateFluoPcj();
- String queryId = createPcj.createPcj(sparql, fluoClient).getQueryId();
- queryId = FluoQueryUtils.convertFluoQueryIdToPcjId(queryId);
- periodicStorage.createPeriodicQuery(queryId, sparql);
- PeriodicNotification notification = PeriodicNotification.builder().id(queryId).period(periodicNode.getPeriod())
+ createPcj.createPcj(pcjId, sparql, Sets.newHashSet(ExportStrategy.RYA), fluoClient);
+
+ //register query with PeriodicResultStorage table
+ periodicStorage.createPeriodicQuery(pcjId, sparql);
+ //create notification
+ PeriodicNotification notification = PeriodicNotification.builder().id(pcjId).period(periodicNode.getPeriod())
.timeUnit(periodicNode.getUnit()).build();
return notification;
} else {
throw new RuntimeException("Invalid PeriodicQuery. Query must possess a PeriodicQuery Filter.");
}
- } catch (MalformedQueryException | PeriodicQueryStorageException e) {
+ } catch (MalformedQueryException | PeriodicQueryStorageException | UnsupportedQueryException e) {
throw new RuntimeException(e);
}
}