You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rya.apache.org by ca...@apache.org on 2017/12/05 19:49:22 UTC
[1/4] incubator-rya git commit: FluoQueryMetadataCache
Repository: incubator-rya
Updated Branches:
refs/heads/master 8acd24b5e -> 62de7c5d1
FluoQueryMetadataCache
Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/6d2bfcbc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/6d2bfcbc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/6d2bfcbc
Branch: refs/heads/master
Commit: 6d2bfcbcc1fe68e74521724d6f5490a6b9c70038
Parents: 8acd24b
Author: Caleb Meier <ca...@parsons.com>
Authored: Thu Oct 26 14:53:56 2017 -0700
Committer: Caleb Meier <ca...@parsons.com>
Committed: Tue Nov 21 07:30:45 2017 -0800
----------------------------------------------------------------------
extras/rya.pcj.fluo/pcj.fluo.app/pom.xml | 48 ++--
.../fluo/app/observers/BindingSetUpdater.java | 15 +-
.../observers/ConstructQueryResultObserver.java | 11 +-
.../fluo/app/observers/QueryResultObserver.java | 8 +-
.../pcj/fluo/app/observers/TripleObserver.java | 5 +-
.../fluo/app/query/FluoQueryMetadataCache.java | 241 +++++++++++++++++++
.../fluo/app/query/MetadataCacheSupplier.java | 44 ++++
.../app/query/FluoQueryMetadataCacheTest.java | 34 +++
.../pcj/fluo/integration/FluoLatencyIT.java | 169 +++++++++++++
.../pcj/fluo/test/base/KafkaExportITBase.java | 11 +-
10 files changed, 538 insertions(+), 48 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/6d2bfcbc/extras/rya.pcj.fluo/pcj.fluo.app/pom.xml
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/pom.xml b/extras/rya.pcj.fluo/pcj.fluo.app/pom.xml
index 01da2dc..fd624eb 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/pom.xml
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/pom.xml
@@ -1,25 +1,16 @@
<?xml version="1.0" encoding="utf-8"?>
-<!--
-
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing,
- software distributed under the License is distributed on an
- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- KIND, either express or implied. See the License for the
- specific language governing permissions and limitations
- under the License.
-
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor
+ license agreements. See the NOTICE file distributed with this work for additional
+ information regarding copyright ownership. The ASF licenses this file to
+ you under the Apache License, Version 2.0 (the "License"); you may not use
+ this file except in compliance with the License. You may obtain a copy of
+ the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
+ by applicable law or agreed to in writing, software distributed under the
+ License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
+ OF ANY KIND, either express or implied. See the License for the specific
+ language governing permissions and limitations under the License. -->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>org.apache.rya</groupId>
@@ -41,12 +32,14 @@
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
-
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
- <!-- Uncommment this block when rya.pcj.fluo.app becomes a leaf project. RYA-341 -->
- <!-- <version>13.0</version> Overriding Rya's Guava version to be compatible with Fluo's required version. Alternative is relocation with shade. -->
+ <!-- Uncommment this block when rya.pcj.fluo.app becomes a leaf
+ project. RYA-341 -->
+ <!-- <version>13.0</version> Overriding Rya's Guava version to
+ be compatible with Fluo's required version. Alternative is relocation with
+ shade. -->
</dependency>
<!-- Rya Runtime Dependencies. -->
@@ -75,7 +68,7 @@
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</dependency>
-
+
<dependency>
<groupId>com.esotericsoftware</groupId>
<artifactId>kryo</artifactId>
@@ -87,6 +80,11 @@
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-all</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/6d2bfcbc/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/BindingSetUpdater.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/BindingSetUpdater.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/BindingSetUpdater.java
index c0cfa1d..9e47132 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/BindingSetUpdater.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/BindingSetUpdater.java
@@ -36,8 +36,9 @@ import org.apache.rya.indexing.pcj.fluo.app.QueryResultUpdater;
import org.apache.rya.indexing.pcj.fluo.app.query.AggregationMetadata;
import org.apache.rya.indexing.pcj.fluo.app.query.ConstructQueryMetadata;
import org.apache.rya.indexing.pcj.fluo.app.query.FilterMetadata;
-import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO;
+import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataCache;
import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata;
+import org.apache.rya.indexing.pcj.fluo.app.query.MetadataCacheSupplier;
import org.apache.rya.indexing.pcj.fluo.app.query.PeriodicQueryMetadata;
import org.apache.rya.indexing.pcj.fluo.app.query.ProjectionMetadata;
import org.apache.rya.indexing.pcj.fluo.app.query.QueryMetadata;
@@ -55,7 +56,7 @@ import edu.umd.cs.findbugs.annotations.NonNull;
public abstract class BindingSetUpdater extends AbstractObserver {
private static final Logger log = Logger.getLogger(BindingSetUpdater.class);
// DAO
- protected final FluoQueryMetadataDAO queryDao = new FluoQueryMetadataDAO();
+ protected final FluoQueryMetadataCache queryDao = MetadataCacheSupplier.getOrCreateCache();
// Updaters
private final JoinResultUpdater joinUpdater = new JoinResultUpdater();
@@ -117,9 +118,9 @@ public abstract class BindingSetUpdater extends AbstractObserver {
} catch (final Exception e) {
throw new RuntimeException("Could not process a Query node.", e);
}
- break;
-
- case CONSTRUCT:
+ break;
+
+ case CONSTRUCT:
final ConstructQueryMetadata constructQuery = queryDao.readConstructQueryMetadata(tx, parentNodeId);
try{
constructUpdater.updateConstructQueryResults(tx, observedBindingSet, constructQuery);
@@ -127,7 +128,7 @@ public abstract class BindingSetUpdater extends AbstractObserver {
throw new RuntimeException("Could not process a Query node.", e);
}
break;
-
+
case FILTER:
final FilterMetadata parentFilter = queryDao.readFilterMetadata(tx, parentNodeId);
try {
@@ -145,7 +146,7 @@ public abstract class BindingSetUpdater extends AbstractObserver {
throw new RuntimeException("Could not process a Join node.", e);
}
break;
-
+
case PERIODIC_QUERY:
final PeriodicQueryMetadata parentPeriodicQuery = queryDao.readPeriodicQueryMetadata(tx, parentNodeId);
try{
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/6d2bfcbc/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/ConstructQueryResultObserver.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/ConstructQueryResultObserver.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/ConstructQueryResultObserver.java
index 61e7244..09d9ede 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/ConstructQueryResultObserver.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/ConstructQueryResultObserver.java
@@ -29,6 +29,8 @@ import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants;
import org.apache.rya.indexing.pcj.fluo.app.NodeType;
import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalRyaSubGraphExporter;
import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
+import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataCache;
+import org.apache.rya.indexing.pcj.fluo.app.query.MetadataCacheSupplier;
/**
* Monitors the Column {@link FluoQueryColumns#CONSTRUCT_STATEMENTS} for new
@@ -40,6 +42,7 @@ import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
public class ConstructQueryResultObserver extends AbstractObserver {
private static final Logger log = Logger.getLogger(ConstructQueryResultObserver.class);
+ protected final FluoQueryMetadataCache queryDao = MetadataCacheSupplier.getOrCreateCache();
@Override
public ObservedColumn getObservedColumn() {
@@ -48,14 +51,14 @@ public class ConstructQueryResultObserver extends AbstractObserver {
@Override
public void process(TransactionBase tx, Bytes row, Column col) throws Exception {
-
+
//Build row for parent that result will be written to
BindingSetRow bsRow = BindingSetRow.make(row);
String constructNodeId = bsRow.getNodeId();
String bsString= bsRow.getBindingSetString();
- String parentNodeId = tx.get(Bytes.of(constructNodeId), FluoQueryColumns.CONSTRUCT_PARENT_NODE_ID).toString();
+ String parentNodeId = queryDao.readMetadadataEntry(tx, constructNodeId, FluoQueryColumns.CONSTRUCT_PARENT_NODE_ID).toString();
String rowString = parentNodeId + IncrementalUpdateConstants.NODEID_BS_DELIM + bsString;
-
+
//Get NodeType of the parent node
NodeType parentType = NodeType.fromNodeId(parentNodeId).get();
//Get data for the ConstructQuery result
@@ -63,5 +66,5 @@ public class ConstructQueryResultObserver extends AbstractObserver {
//Write result to parent
tx.set(Bytes.of(rowString), parentType.getResultColumn(), bytes);
}
-
+
}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/6d2bfcbc/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java
index 9514932..78d0ec5 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java
@@ -35,7 +35,8 @@ import org.apache.rya.indexing.pcj.fluo.app.export.kafka.KafkaRyaSubGraphExporte
import org.apache.rya.indexing.pcj.fluo.app.export.rya.PeriodicBindingSetExporterFactory;
import org.apache.rya.indexing.pcj.fluo.app.export.rya.RyaBindingSetExporterFactory;
import org.apache.rya.indexing.pcj.fluo.app.export.rya.RyaSubGraphExporterFactory;
-import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO;
+import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataCache;
+import org.apache.rya.indexing.pcj.fluo.app.query.MetadataCacheSupplier;
import org.apache.rya.indexing.pcj.fluo.app.query.QueryMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -49,8 +50,7 @@ import com.google.common.collect.ImmutableSet;
public class QueryResultObserver extends AbstractObserver {
private static final Logger log = LoggerFactory.getLogger(QueryResultObserver.class);
- private static final FluoQueryMetadataDAO DAO = new FluoQueryMetadataDAO();
-
+ protected final FluoQueryMetadataCache queryDao = MetadataCacheSupplier.getOrCreateCache();
/**
* Builders for each type of {@link IncrementalBindingSetExporter} we support.
*/
@@ -101,7 +101,7 @@ public class QueryResultObserver extends AbstractObserver {
// Read the queryId from the row and get the QueryMetadata.
final String queryId = row.split(NODEID_BS_DELIM)[0];
- final QueryMetadata metadata = DAO.readQueryMetadata(tx, queryId);
+ final QueryMetadata metadata = queryDao.readQueryMetadata(tx, queryId);
// Read the Child Binding Set that will be exported.
final Bytes valueBytes = tx.get(brow, col);
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/6d2bfcbc/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/TripleObserver.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/TripleObserver.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/TripleObserver.java
index 2d7f390..d6fd8bd 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/TripleObserver.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/TripleObserver.java
@@ -34,7 +34,8 @@ import org.apache.fluo.api.observer.AbstractObserver;
import org.apache.rya.api.domain.RyaStatement;
import org.apache.rya.indexing.pcj.fluo.app.IncUpdateDAO;
import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
-import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO;
+import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataCache;
+import org.apache.rya.indexing.pcj.fluo.app.query.MetadataCacheSupplier;
import org.apache.rya.indexing.pcj.fluo.app.query.StatementPatternMetadata;
import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
@@ -55,7 +56,7 @@ public class TripleObserver extends AbstractObserver {
private static final Logger log = LoggerFactory.getLogger(TripleObserver.class);
private static final VisibilityBindingSetSerDe BS_SERDE = new VisibilityBindingSetSerDe();
- private static final FluoQueryMetadataDAO QUERY_METADATA_DAO = new FluoQueryMetadataDAO();
+ private static final FluoQueryMetadataCache QUERY_METADATA_DAO = MetadataCacheSupplier.getOrCreateCache();
private static final VisibilityBindingSetStringConverter VIS_BS_CONVERTER = new VisibilityBindingSetStringConverter();
public TripleObserver() {}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/6d2bfcbc/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataCache.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataCache.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataCache.java
new file mode 100644
index 0000000..8adc40d
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataCache.java
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */package org.apache.rya.indexing.pcj.fluo.app.query;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import java.util.concurrent.Callable;
+
+import org.apache.fluo.api.client.SnapshotBase;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.Column;
+import org.apache.rya.indexing.pcj.fluo.app.NodeType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Optional;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+
+/**
+ * Wrapper for {@link FluoQueryMetadataDAO} that caches any metadata that has been retrieved from Fluo. This class first
+ * checks the cache to see if the metadata is present before delegating to the underlying DAO method to retrieve the
+ * data.
+ *
+ */
+public class FluoQueryMetadataCache extends FluoQueryMetadataDAO {
+
+ private static final Logger LOG = LoggerFactory.getLogger(FluoQueryMetadataCache.class);
+ private final FluoQueryMetadataDAO dao;
+ private final Cache<String, CommonNodeMetadata> commonNodeMetadataCache;
+ private final Cache<String, Bytes> metadataCache;
+ private int capacity;
+ private int concurrencyLevel;
+
+ /**
+ * Creates a FluoQueryMetadataCache with the specified capacity. Old, unused results are evicted as necessary.
+ *
+ * @param capacity - max size of the cache
+ */
+ public FluoQueryMetadataCache(FluoQueryMetadataDAO dao, int capacity, int concurrencyLevel) {
+ this.dao = dao;
+ commonNodeMetadataCache = CacheBuilder.newBuilder().concurrencyLevel(concurrencyLevel).maximumSize(capacity).build();
+ metadataCache = CacheBuilder.newBuilder().concurrencyLevel(concurrencyLevel).maximumSize(capacity).build();
+ this.capacity = capacity;
+ this.concurrencyLevel = concurrencyLevel;
+ }
+
+ /**
+ * @return - capacity of this cache in terms of max number of entries
+ */
+ public int getCapacity() {
+ return capacity;
+ }
+
+ /**
+ * @return - concurrencyLevel of this cache,in terms of number of partitions that distinct threads can operate on
+ * without waiting for other threads
+ */
+ public int getConcurrencyLevel() {
+ return concurrencyLevel;
+ }
+
+ @Override
+ public StatementPatternMetadata readStatementPatternMetadata(SnapshotBase tx, String nodeId) {
+ Optional<NodeType> type = NodeType.fromNodeId(nodeId);
+
+ try {
+ checkArgument(type.isPresent() && type.get() == NodeType.STATEMENT_PATTERN);
+ LOG.debug("Retrieving Metadata from Cache: {}", nodeId);
+ return (StatementPatternMetadata) commonNodeMetadataCache.get(nodeId, new Callable<CommonNodeMetadata>() {
+ @Override
+ public CommonNodeMetadata call() throws Exception {
+ LOG.debug("Seeking Metadata from Fluo Table: {}", nodeId);
+ return dao.readStatementPatternMetadata(tx, nodeId);
+ }
+ });
+ } catch (Exception e) {
+ throw new RuntimeException("Unable to access StatementPatternMetadata for nodeId: " + nodeId, e);
+ }
+ }
+
+ @Override
+ public JoinMetadata readJoinMetadata(SnapshotBase tx, String nodeId) {
+ Optional<NodeType> type = NodeType.fromNodeId(nodeId);
+ try {
+ checkArgument(type.isPresent() && type.get() == NodeType.JOIN);
+ LOG.debug("Retrieving Metadata from Cache: {}.", nodeId);
+ return (JoinMetadata) commonNodeMetadataCache.get(nodeId, new Callable<CommonNodeMetadata>() {
+ @Override
+ public CommonNodeMetadata call() throws Exception {
+ LOG.debug("Seeking Metadata from Fluo Table: {}.", nodeId);
+ return dao.readJoinMetadata(tx, nodeId);
+ }
+ });
+ } catch (Exception e) {
+ throw new RuntimeException("Unable to access JoinMetadata for nodeId: " + nodeId, e);
+ }
+ }
+
+ @Override
+ public FilterMetadata readFilterMetadata(SnapshotBase tx, String nodeId) {
+ Optional<NodeType> type = NodeType.fromNodeId(nodeId);
+ try {
+ checkArgument(type.isPresent() && type.get() == NodeType.FILTER);
+ LOG.debug("Retrieving Metadata from Cache: {}", nodeId);
+ return (FilterMetadata) commonNodeMetadataCache.get(nodeId, new Callable<CommonNodeMetadata>() {
+ @Override
+ public CommonNodeMetadata call() throws Exception {
+ LOG.debug("Seeking Metadata from Fluo Table: {}.", nodeId);
+ return dao.readFilterMetadata(tx, nodeId);
+ }
+ });
+ } catch (Exception e) {
+ throw new RuntimeException("Unable to access FilterMetadata for nodeId: " + nodeId, e);
+ }
+ }
+
+ @Override
+ public ProjectionMetadata readProjectionMetadata(SnapshotBase tx, String nodeId) {
+ Optional<NodeType> type = NodeType.fromNodeId(nodeId);
+ checkArgument(type.isPresent() && type.get() == NodeType.PROJECTION);
+ LOG.debug("Retrieving Metadata from Cache: {}", nodeId);
+ try {
+ return (ProjectionMetadata) commonNodeMetadataCache.get(nodeId, new Callable<CommonNodeMetadata>() {
+ @Override
+ public CommonNodeMetadata call() throws Exception {
+ LOG.debug("Seeking Metadata from Fluo Table: {}.", nodeId);
+ return dao.readProjectionMetadata(tx, nodeId);
+ }
+ });
+ } catch (Exception e) {
+ throw new RuntimeException("Unable to access ProjectionMetadata for nodeId: " + nodeId, e);
+ }
+ }
+
+ @Override
+ public AggregationMetadata readAggregationMetadata(SnapshotBase tx, String nodeId) {
+ Optional<NodeType> type = NodeType.fromNodeId(nodeId);
+ try {
+ checkArgument(type.isPresent() && type.get() == NodeType.AGGREGATION);
+ LOG.debug("Retrieving Metadata from Cache: {}", nodeId);
+ return (AggregationMetadata) commonNodeMetadataCache.get(nodeId, new Callable<CommonNodeMetadata>() {
+ @Override
+ public CommonNodeMetadata call() throws Exception {
+ LOG.debug("Seeking Metadata from Fluo Table: {}.", nodeId);
+ return dao.readAggregationMetadata(tx, nodeId);
+ }
+ });
+ } catch (Exception e) {
+ throw new RuntimeException("Unable to access AggregationMetadata for nodeId: " + nodeId, e);
+ }
+ }
+
+ @Override
+ public ConstructQueryMetadata readConstructQueryMetadata(SnapshotBase tx, String nodeId) {
+ Optional<NodeType> type = NodeType.fromNodeId(nodeId);
+ try {
+ checkArgument(type.isPresent() && type.get() == NodeType.CONSTRUCT);
+ LOG.debug("Retrieving Metadata from Cache: {}", nodeId);
+ return (ConstructQueryMetadata) commonNodeMetadataCache.get(nodeId, new Callable<CommonNodeMetadata>() {
+ @Override
+ public CommonNodeMetadata call() throws Exception {
+ LOG.debug("Seeking Metadata from Fluo Table: {}.", nodeId);
+ return dao.readConstructQueryMetadata(tx, nodeId);
+ }
+ });
+ } catch (Exception e) {
+ throw new RuntimeException("Unable to access ConstructQueryMetadata for nodeId: " + nodeId, e);
+ }
+ }
+
+ @Override
+ public PeriodicQueryMetadata readPeriodicQueryMetadata(SnapshotBase tx, String nodeId) {
+ Optional<NodeType> type = NodeType.fromNodeId(nodeId);
+ try {
+ checkArgument(type.isPresent() && type.get() == NodeType.PERIODIC_QUERY);
+ LOG.debug("Retrieving Metadata from Cache: {}", nodeId);
+ return (PeriodicQueryMetadata) commonNodeMetadataCache.get(nodeId, new Callable<CommonNodeMetadata>() {
+ @Override
+ public CommonNodeMetadata call() throws Exception {
+ LOG.debug("Seeking Metadata from Fluo Table: {}.", nodeId);
+ return dao.readPeriodicQueryMetadata(tx, nodeId);
+ }
+ });
+ } catch (Exception e) {
+ throw new RuntimeException("Unable to access PeriodicQueryMetadata for nodeId: " + nodeId, e);
+ }
+ }
+
+ @Override
+ public QueryMetadata readQueryMetadata(SnapshotBase tx, String nodeId) {
+ Optional<NodeType> type = NodeType.fromNodeId(nodeId);
+ try {
+ checkArgument(type.isPresent() && type.get() == NodeType.QUERY);
+ LOG.debug("Retrieving Metadata from Cache: {}", nodeId);
+ return (QueryMetadata) commonNodeMetadataCache.get(nodeId, new Callable<CommonNodeMetadata>() {
+ @Override
+ public CommonNodeMetadata call() throws Exception {
+ LOG.debug("Seeking Metadata from Fluo Table: {}.", nodeId);
+ return dao.readQueryMetadata(tx, nodeId);
+ }
+ });
+ } catch (Exception e) {
+ throw new RuntimeException("Unable to access QueryMetadata for nodeId: " + nodeId, e);
+ }
+ }
+
+ public Bytes readMetadadataEntry(SnapshotBase tx, String rowId, Column column) {
+ Optional<NodeType> type = NodeType.fromNodeId(rowId);
+ try {
+ checkArgument(type.isPresent() && type.get().getMetaDataColumns().contains(column));
+ return metadataCache.get(getKey(rowId, column), new Callable<Bytes>() {
+ @Override
+ public Bytes call() throws Exception {
+ return tx.get(Bytes.of(rowId), column);
+ }
+ });
+ } catch (Exception e) {
+ throw new RuntimeException("Unable to access Metadata Entry with rowId: " + rowId + " and column: " + column, e);
+ }
+ }
+
+ private String getKey(String row, Column column) {
+ return row + ":" + column.getsQualifier() + ":" + column.getsQualifier();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/6d2bfcbc/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/MetadataCacheSupplier.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/MetadataCacheSupplier.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/MetadataCacheSupplier.java
new file mode 100644
index 0000000..faab952
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/MetadataCacheSupplier.java
@@ -0,0 +1,44 @@
+package org.apache.rya.indexing.pcj.fluo.app.query;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MetadataCacheSupplier {
+
+ private static final Logger LOG = LoggerFactory.getLogger(MetadataCacheSupplier.class);
+ private static FluoQueryMetadataCache CACHE;
+ private static boolean initialized = false;
+ private static final int DEFAULT_CAPACITY = 10000;
+ private static final int DEFAULT_CONCURRENCY = 8;
+
+ /**
+ * Returns an existing cache with the specified instance name, or creates a cache. The created cache will have the
+ * indicated capacity and concurrencyLevel if one is provided.
+ *
+ * @param capacity - capacity used to create a new cache
+ * @param concurrencyLevel - concurrencyLevel used to create a new cache
+ */
+ public static FluoQueryMetadataCache getOrCreateCache(int capacity, int concurrencyLevel) {
+ if (!initialized) {
+ LOG.debug("Cache has not been initialized. Initializing cache with capacity: {} and concurrencylevel: {}", capacity,
+ concurrencyLevel);
+ CACHE = new FluoQueryMetadataCache(new FluoQueryMetadataDAO(), capacity, concurrencyLevel);
+ initialized = true;
+ } else {
+ LOG.debug("Cache has already been initialized. Returning cache with capacity: {} and concurrencylevel: {}",
+ CACHE.getCapacity(), CACHE.getConcurrencyLevel());
+ }
+ return CACHE;
+ }
+
+ /**
+ * Returns cache with the name {@link FluoQueryMetadataCache#FLUO_CACHE_INSTANCE} if it exists, otherwise creates it
+ * with a default size of 10000 entries and a default concurrency level of 8.
+ *
+ * @return - FluoQueryMetadataCache with default instance name and default capacity and concurrency
+ */
+ public static FluoQueryMetadataCache getOrCreateCache() {
+ return getOrCreateCache(DEFAULT_CAPACITY, DEFAULT_CONCURRENCY);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/6d2bfcbc/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataCacheTest.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataCacheTest.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataCacheTest.java
new file mode 100644
index 0000000..3df3708
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataCacheTest.java
@@ -0,0 +1,34 @@
+package org.apache.rya.indexing.pcj.fluo.app.query;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.when;
+
+import org.apache.fluo.api.client.Transaction;
+import org.apache.rya.indexing.pcj.fluo.app.NodeType;
+import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class FluoQueryMetadataCacheTest {
+
+ @Test
+ public void testCache() {
+ FluoQueryMetadataDAO mockDAO = Mockito.mock(FluoQueryMetadataDAO.class);
+ Transaction mockTx = Mockito.mock(Transaction.class);
+ String nodeId = NodeType.generateNewFluoIdForType(NodeType.STATEMENT_PATTERN);
+ StatementPatternMetadata metadata = StatementPatternMetadata.builder(nodeId).setParentNodeId("parent")
+ .setStatementPattern("pattern").setVarOrder(new VariableOrder("xyz")).build();
+ when(mockDAO.readStatementPatternMetadata(mockTx, nodeId)).thenReturn(metadata);
+
+ FluoQueryMetadataCache cache = new FluoQueryMetadataCache(mockDAO, 20, 2);
+
+ assertEquals(metadata, cache.readStatementPatternMetadata(mockTx, nodeId));
+
+ cache.readStatementPatternMetadata(mockTx, nodeId);
+ cache.readStatementPatternMetadata(mockTx, nodeId);
+ cache.readStatementPatternMetadata(mockTx, nodeId);
+ cache.readStatementPatternMetadata(mockTx, nodeId);
+
+ Mockito.verify(mockDAO, Mockito.times(1)).readStatementPatternMetadata(mockTx, nodeId);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/6d2bfcbc/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/FluoLatencyIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/FluoLatencyIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/FluoLatencyIT.java
new file mode 100644
index 0000000..fabf512
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/FluoLatencyIT.java
@@ -0,0 +1,169 @@
+package org.apache.rya.indexing.pcj.fluo.integration;
+
+import static java.util.Objects.requireNonNull;
+
+import java.time.ZonedDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import javax.xml.datatype.DatatypeConfigurationException;
+import javax.xml.datatype.DatatypeFactory;
+
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.fluo.api.client.FluoClient;
+import org.apache.fluo.api.client.FluoFactory;
+import org.apache.fluo.api.client.Transaction;
+import org.apache.fluo.api.client.scanner.CellScanner;
+import org.apache.fluo.api.config.FluoConfiguration;
+import org.apache.fluo.api.data.RowColumnValue;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.rya.api.client.CreatePCJ.ExportStrategy;
+import org.apache.rya.indexing.pcj.fluo.api.CreateFluoPcj;
+import org.apache.rya.indexing.pcj.fluo.app.util.FluoQueryUtils;
+import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
+import org.apache.rya.pcj.fluo.test.base.KafkaExportITBase;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.openrdf.model.Literal;
+import org.openrdf.model.Statement;
+import org.openrdf.model.ValueFactory;
+import org.openrdf.model.impl.ValueFactoryImpl;
+import org.openrdf.repository.sail.SailRepositoryConnection;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+public class FluoLatencyIT extends KafkaExportITBase {
+ private static ValueFactory vf;
+ private static DatatypeFactory dtf;
+
+ @BeforeClass
+ public static void init() throws DatatypeConfigurationException {
+ vf = new ValueFactoryImpl();
+ dtf = DatatypeFactory.newInstance();
+ }
+
+ @Test
+ public void resultsExported() throws Exception {
+
+ final String sparql = "prefix time: <http://www.w3.org/2006/time#> " + "select ?type (count(?obs) as ?total) where { "
+ + " ?obs <uri:hasTime> ?time. " + " ?obs <uri:hasObsType> ?type " + "} " + "group by ?type";
+
+// final String sparql = "prefix time: <http://www.w3.org/2006/time#> " + "select ?type ?obs where { "
+// + " ?obs <uri:hasTime> ?time. " + " ?obs <uri:hasObsType> ?type " + "}";
+
+ try (FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) {
+ // Tell the Fluo app to maintain the PCJ.
+ String pcjId = FluoQueryUtils.createNewPcjId();
+ FluoConfiguration conf = super.getFluoConfiguration();
+ new CreateFluoPcj().createPcj(pcjId, sparql, Sets.newHashSet(ExportStrategy.KAFKA), fluoClient);
+ SailRepositoryConnection conn = super.getRyaSailRepository().getConnection();
+
+ long start = System.currentTimeMillis();
+ int numReturned = 0;
+ int numObs = 10;
+ int numTypes = 5;
+ int numExpected = 0;
+ int increment = numObs*numTypes;
+ while (System.currentTimeMillis() - start < 60000) {
+ List<Statement> statements = generate(10, 5, "car_", numExpected, ZonedDateTime.now());
+ conn.add(statements);
+ numExpected += increment;
+ System.out.println("Num Accumulo Entries: " + getNumAccEntries(conf.getAccumuloTable()) + " Num Fluo Entries: "
+ + getNumFluoEntries(fluoClient));
+ numReturned += readAllResults(pcjId).size();
+ System.out
+ .println("Expected: " + numExpected + " NumReturned: " + numReturned + " Difference: " + (numExpected - numReturned));
+// FluoITHelper.printFluoTable(conf);
+ Thread.sleep(30000);
+ }
+ }
+ }
+
+ /**
+ * Generates (numObservationsPerType x numTypes) statements of the form:
+ *
+ * <pre>
+ * urn:obs_n uri:hasTime zonedTime
+ * urn:obs_n uri:hasObsType typePrefix_m
+ * </pre>
+ *
+ * Where the n in urn:obs_n is the ith value in 0 to (numObservationsPerType x numTypes) with an offset specified by
+ * observationOffset, and where m in typePrefix_m is the jth value in 0 to numTypes.
+ *
+ * @param numObservationsPerType - The quantity of observations per type to generate.
+ * @param numTypes - The number of types to generate observations for.
+ * @param typePrefix - The prefix to be used for the type literal in the statement.
+ * @param observationOffset - The offset to be used for determining the value of n in the above statements.
+ * @param zonedTime - The time to be used for all observations generated.
+ * @return A new list of all generated Statements.
+ */
+ public List<Statement> generate(final long numObservationsPerType, final int numTypes, final String typePrefix,
+ final long observationOffset, final ZonedDateTime zonedTime) {
+ final String time = zonedTime.format(DateTimeFormatter.ISO_INSTANT);
+ final Literal litTime = vf.createLiteral(dtf.newXMLGregorianCalendar(time));
+ final List<Statement> statements = Lists.newArrayList();
+
+ for (long i = 0; i < numObservationsPerType; i++) {
+ for (int j = 0; j < numTypes; j++) {
+ final long observationId = observationOffset + i * numTypes + j;
+ // final String obsId = "urn:obs_" + Long.toHexString(observationId) + "_" + observationId;
+ // final String obsId = "urn:obs_" + observationId;
+ final String obsId = "urn:obs_" + String.format("%020d", observationId);
+ final String type = typePrefix + j;
+ // logger.info(obsId + " " + type + " " + litTime);
+ statements.add(vf.createStatement(vf.createURI(obsId), vf.createURI("uri:hasTime"), litTime));
+ statements.add(vf.createStatement(vf.createURI(obsId), vf.createURI("uri:hasObsType"), vf.createLiteral(type)));
+ }
+ }
+
+ return statements;
+ }
+
+ private Set<VisibilityBindingSet> readAllResults(final String pcjId) throws Exception {
+ requireNonNull(pcjId);
+
+ // Read all of the results from the Kafka topic.
+ final Set<VisibilityBindingSet> results = new HashSet<>();
+
+ try (final KafkaConsumer<String, VisibilityBindingSet> consumer = makeConsumer(pcjId)) {
+ final ConsumerRecords<String, VisibilityBindingSet> records = consumer.poll(5000);
+ final Iterator<ConsumerRecord<String, VisibilityBindingSet>> recordIterator = records.iterator();
+ while (recordIterator.hasNext()) {
+ results.add(recordIterator.next().value());
+ }
+ }
+
+ return results;
+ }
+
+ private int getNumAccEntries(String tableName) throws TableNotFoundException {
+ Scanner scanner = super.getAccumuloConnector().createScanner(tableName, new Authorizations());
+ int count = 0;
+ for (Map.Entry<Key, Value> entry : scanner) {
+ count++;
+ }
+ return count;
+ }
+
+ private int getNumFluoEntries(FluoClient client) {
+ Transaction tx = client.newTransaction();
+ CellScanner scanner = tx.scanner().build();
+ int count = 0;
+ for (RowColumnValue rcv : scanner) {
+ count++;
+ }
+ return count;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/6d2bfcbc/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/pcj/fluo/test/base/KafkaExportITBase.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/pcj/fluo/test/base/KafkaExportITBase.java b/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/pcj/fluo/test/base/KafkaExportITBase.java
index 59fe54f..7b16dcf 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/pcj/fluo/test/base/KafkaExportITBase.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/pcj/fluo/test/base/KafkaExportITBase.java
@@ -69,7 +69,6 @@ import org.apache.rya.rdftriplestore.RyaSailRepository;
import org.apache.rya.sail.config.RyaSailFactory;
import org.junit.After;
import org.junit.Before;
-import org.junit.Test;
import org.openrdf.model.Statement;
import org.openrdf.repository.sail.SailRepositoryConnection;
import org.openrdf.sail.Sail;
@@ -129,7 +128,7 @@ public class KafkaExportITBase extends AccumuloExportITBase {
final KafkaBindingSetExporterParameters kafkaParams = new KafkaBindingSetExporterParameters(exportParams);
kafkaParams.setUseKafkaBindingSetExporter(true);
kafkaParams.setKafkaBootStrapServers(BROKERHOST + ":" + BROKERPORT);
-
+
final KafkaSubGraphExporterParameters kafkaConstructParams = new KafkaSubGraphExporterParameters(exportParams);
kafkaConstructParams.setUseKafkaSubgraphExporter(true);
@@ -262,7 +261,7 @@ public class KafkaExportITBase extends AccumuloExportITBase {
* If this test fails then its a testing environment issue, not with Rya.
* Source: https://github.com/asmaier/mini-kafka
*/
- @Test
+// @Test
public void embeddedKafkaTest() throws Exception {
// create topic
final String topic = "testTopic";
@@ -339,9 +338,9 @@ public class KafkaExportITBase extends AccumuloExportITBase {
// The PCJ Id is the topic name the results will be written to.
return pcjId;
}
-
+
protected void loadData(final Collection<Statement> statements) throws Exception {
-
+
requireNonNull(statements);
final SailRepositoryConnection ryaConn = getRyaSailRepository().getConnection();
@@ -352,7 +351,7 @@ public class KafkaExportITBase extends AccumuloExportITBase {
// Wait for the Fluo application to finish computing the end result.
super.getMiniFluo().waitForObservers();
-
+
}
}
[3/4] incubator-rya git commit: RYA-406. Closes #251.
Posted by ca...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/SpanBatchBindingSetUpdater.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/SpanBatchBindingSetUpdater.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/SpanBatchBindingSetUpdater.java
index 749a77d..d56574e 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/SpanBatchBindingSetUpdater.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/SpanBatchBindingSetUpdater.java
@@ -30,18 +30,18 @@ import org.apache.fluo.api.data.ColumnValue;
import org.apache.fluo.api.data.RowColumn;
import org.apache.fluo.api.data.Span;
import org.apache.log4j.Logger;
+import org.apache.rya.indexing.pcj.fluo.app.BindingSetRow;
+import org.apache.rya.indexing.pcj.fluo.app.NodeType;
import org.apache.rya.indexing.pcj.fluo.app.batch.BatchInformation.Task;
import com.google.common.base.Preconditions;
/**
- * This class processes {@link SpanBatchDeleteInformation} objects by
- * deleting the entries in the Fluo Column corresponding to the {@link Span}
- * of the BatchInformation object. This class will delete entries until the
- * batch size is met, and then create a new SpanBatchDeleteInformation object
- * with an updated Span whose starting point is the stopping point of this
- * batch. If the batch limit is not met, then a new batch is not created and
- * the task is complete.
+ * This class processes {@link SpanBatchDeleteInformation} objects by deleting the entries in the Fluo Column
+ * corresponding to the {@link Span} of the BatchInformation object. This class will delete entries until the batch size
+ * is met, and then create a new SpanBatchDeleteInformation object with an updated Span whose starting point is the
+ * stopping point of this batch. If the batch limit is not met, then a new batch is not created and the task is
+ * complete.
*
*/
public class SpanBatchBindingSetUpdater extends AbstractBatchBindingSetUpdater {
@@ -49,8 +49,8 @@ public class SpanBatchBindingSetUpdater extends AbstractBatchBindingSetUpdater {
private static final Logger log = Logger.getLogger(SpanBatchBindingSetUpdater.class);
/**
- * Process SpanBatchDeleteInformation objects by deleting all entries indicated
- * by Span until batch limit is met.
+ * Process SpanBatchDeleteInformation objects by deleting all entries indicated by Span until batch limit is met.
+ *
* @param tx - Fluo Transaction
* @param row - Byte row identifying BatchInformation
* @param batch - SpanBatchDeleteInformation object to be processed
@@ -60,6 +60,7 @@ public class SpanBatchBindingSetUpdater extends AbstractBatchBindingSetUpdater {
super.processBatch(tx, row, batch);
Preconditions.checkArgument(batch instanceof SpanBatchDeleteInformation);
SpanBatchDeleteInformation spanBatch = (SpanBatchDeleteInformation) batch;
+ Optional<String> nodeId = spanBatch.getNodeId();
Task task = spanBatch.getTask();
int batchSize = spanBatch.getBatchSize();
Span span = spanBatch.getSpan();
@@ -71,7 +72,7 @@ public class SpanBatchBindingSetUpdater extends AbstractBatchBindingSetUpdater {
log.trace("The Task Add is not supported for SpanBatchBindingSetUpdater. Batch " + batch + " will not be processed.");
break;
case Delete:
- rowCol = deleteBatch(tx, span, column, batchSize);
+ rowCol = deleteBatch(tx, nodeId, span, column, batchSize);
break;
case Update:
log.trace("The Task Update is not supported for SpanBatchBindingSetUpdater. Batch " + batch + " will not be processed.");
@@ -90,7 +91,7 @@ public class SpanBatchBindingSetUpdater extends AbstractBatchBindingSetUpdater {
}
}
- private Optional<RowColumn> deleteBatch(TransactionBase tx, Span span, Column column, int batchSize) {
+ private Optional<RowColumn> deleteBatch(TransactionBase tx, Optional<String> nodeId, Span span, Column column, int batchSize) {
log.trace("Deleting batch of size: " + batchSize + " using Span: " + span + " and Column: " + column);
RowScanner rs = tx.scanner().over(span).fetch(column).byRow().build();
@@ -100,18 +101,39 @@ public class SpanBatchBindingSetUpdater extends AbstractBatchBindingSetUpdater {
int count = 0;
boolean batchLimitMet = false;
Bytes row = span.getStart().getRow();
+
+ //get prefix if nodeId is specified
+ Optional<Bytes> prefixBytes = Optional.empty();
+ if (nodeId.isPresent()) {
+ NodeType type = NodeType.fromNodeId(nodeId.get()).get();
+ prefixBytes = Optional.ofNullable(Bytes.of(type.getNodeTypePrefix()));
+ }
+
while (colScannerIter.hasNext() && !batchLimitMet) {
ColumnScanner colScanner = colScannerIter.next();
row = colScanner.getRow();
- Iterator<ColumnValue> iter = colScanner.iterator();
- while (iter.hasNext()) {
- if (count >= batchSize) {
- batchLimitMet = true;
- break;
+
+ //extract the nodeId from the returned row if a nodeId was passed
+ //into the SpanBatchInformation. This is to ensure that the returned
+ //row nodeId is equal to the nodeId passed in to the span batch information
+ Optional<String> rowNodeId = Optional.empty();
+ if (prefixBytes.isPresent()) {
+ rowNodeId = Optional.of(BindingSetRow.makeFromShardedRow(prefixBytes.get(), row).getNodeId());
+ }
+
+ //if nodeId is present, then results returned by span are filtered
+ //on the nodeId. This occurs when the hash is not included in the span
+ if (!rowNodeId.isPresent() || rowNodeId.equals(nodeId)) {
+ Iterator<ColumnValue> iter = colScanner.iterator();
+ while (iter.hasNext()) {
+ if (count >= batchSize) {
+ batchLimitMet = true;
+ break;
+ }
+ ColumnValue colVal = iter.next();
+ tx.delete(row, colVal.getColumn());
+ count++;
}
- ColumnValue colVal = iter.next();
- tx.delete(row, colVal.getColumn());
- count++;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/SpanBatchDeleteInformation.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/SpanBatchDeleteInformation.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/SpanBatchDeleteInformation.java
index 3b1e245..87158b7 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/SpanBatchDeleteInformation.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/SpanBatchDeleteInformation.java
@@ -1,4 +1,8 @@
package org.apache.rya.indexing.pcj.fluo.app.batch;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.Optional;
+
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@@ -19,22 +23,38 @@ package org.apache.rya.indexing.pcj.fluo.app.batch;
*/
import org.apache.fluo.api.data.Column;
import org.apache.fluo.api.data.Span;
+import org.apache.rya.indexing.pcj.fluo.app.util.BindingHashShardingFunction;
/**
* This class represents a batch order to delete all entries in the Fluo table indicated
* by the given Span and Column. These batch orders are processed by the {@link BatchObserver},
* which uses this batch information along with the nodeId passed into the Observer to perform
- * batch deletes.
+ * batch deletes.
*
*/
public class SpanBatchDeleteInformation extends AbstractSpanBatchInformation {
private static final BatchBindingSetUpdater updater = new SpanBatchBindingSetUpdater();
-
- public SpanBatchDeleteInformation(int batchSize, Column column, Span span) {
+ private Optional<String> nodeId;
+
+ /**
+ * Create a new SpanBatchInformation object.
+ * @param nodeId - Optional nodeId that is used to filter returned results. Useful if the shard Id
+ * is not included in the Span (see {@link BindingHashShardingFunction} for more info about how sharded
+ * row keys are generated).
+ * @param batchSize - size of batch to be deleted
+ * @param column - column whose entries will be deleted
+ * @param span - Span indicating the range of data to delete. Sometimes the Span cannot contain the hash
+ * (for example, if you are deleting all of the results associated with a nodeId). In this case, a nodeId
+ * should be specified along with a Span equal to the prefix of the nodeId.
+ * @throws IllegalArgumentException if nodeId, column or span is null and if batchSize <= 0.
+ */
+ public SpanBatchDeleteInformation(Optional<String> nodeId, int batchSize, Column column, Span span) {
super(batchSize, Task.Delete, column, span);
+ checkNotNull(nodeId);
+ this.nodeId = nodeId;
}
-
+
/**
* @return Updater that applies the {@link Task} to the given {@link Span} and {@link Column}
*/
@@ -42,17 +62,42 @@ public class SpanBatchDeleteInformation extends AbstractSpanBatchInformation {
public BatchBindingSetUpdater getBatchUpdater() {
return updater;
}
-
-
+
+ /**
+ * Returns an Optional nodeId. If this value is specified, the results
+ * returned from the Fluo scan over the indicated range will be filtered
+ * by the nodeId. The nodeId allows results for a given query nodeId to be
+ * deleted using a Span even if the hash cannot be specified when forming the
+ * rowId in the table.
+ * @return - the nodeId whose results will be batch deleted
+ */
+ public Optional<String> getNodeId() {
+ return nodeId;
+ }
+
+ @Override
+ public String toString() {
+ return new StringBuilder()
+ .append("Span Batch Information {\n")
+ .append(" Span: " + super.getSpan() + "\n")
+ .append(" Batch Size: " + super.getBatchSize() + "\n")
+ .append(" Task: " + super.getTask() + "\n")
+ .append(" Column: " + super.getColumn() + "\n")
+ .append(" NodeId: " + nodeId + "\n")
+ .append("}")
+ .toString();
+ }
+
public static Builder builder() {
return new Builder();
}
-
+
public static class Builder {
private int batchSize = DEFAULT_BATCH_SIZE;
private Column column;
private Span span;
+ private Optional<String> nodeId = Optional.empty();
/**
* @param batchSize - {@link Task}s are applied in batches of this size
@@ -74,19 +119,34 @@ public class SpanBatchDeleteInformation extends AbstractSpanBatchInformation {
/**
* @param span - span that batch {@link Task} will be applied to
- *
+ *
*/
public Builder setSpan(Span span) {
this.span = span;
return this;
}
+ /**
+ * Sets the nodeId whose results will be batch deleted. This optional value
+ * allows the {@link SpanBatchBindingSetUpdater} to filter on the indicated
+ * nodeId. Because the results of the Fluo table are sharded, if the Span does
+ * not include the shard, then it is not possible to scan exactly for all results
+ * pertaining to a specific nodeId. In the event that a user wants to delete all nodes
+ * related to a specific entry, this Optional nodeId should be specified to retrieve
+ * only the results associated with the indicated nodeId.
+ * @param nodeId - node whose results will be batch deleted
+ * @return - Builder for chaining method calls
+ */
+ public Builder setNodeId(Optional<String> nodeId) {
+ this.nodeId = nodeId;
+ return this;
+ }
/**
* @return an instance of {@link SpanBatchDeleteInformation} constructed from parameters passed to this Builder
*/
public SpanBatchDeleteInformation build() {
- return new SpanBatchDeleteInformation(batchSize, column, span);
+ return new SpanBatchDeleteInformation(nodeId, batchSize, column, span);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/serializer/SpanBatchInformationTypeAdapter.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/serializer/SpanBatchInformationTypeAdapter.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/serializer/SpanBatchInformationTypeAdapter.java
index 98deb8e..8644c31 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/serializer/SpanBatchInformationTypeAdapter.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/serializer/SpanBatchInformationTypeAdapter.java
@@ -1,4 +1,5 @@
package org.apache.rya.indexing.pcj.fluo.app.batch.serializer;
+
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@@ -18,6 +19,7 @@ package org.apache.rya.indexing.pcj.fluo.app.batch.serializer;
* under the License.
*/
import java.lang.reflect.Type;
+import java.util.Optional;
import org.apache.fluo.api.data.Column;
import org.apache.fluo.api.data.RowColumn;
@@ -37,10 +39,12 @@ import com.google.gson.JsonSerializer;
* JsonSerializer/JsonDeserializer used to serialize/deserialize {@link SpanBatchDeleteInformation} objects.
*
*/
-public class SpanBatchInformationTypeAdapter implements JsonSerializer<SpanBatchDeleteInformation>, JsonDeserializer<SpanBatchDeleteInformation> {
+public class SpanBatchInformationTypeAdapter
+ implements JsonSerializer<SpanBatchDeleteInformation>, JsonDeserializer<SpanBatchDeleteInformation> {
@Override
- public SpanBatchDeleteInformation deserialize(JsonElement element, Type typeOfT, JsonDeserializationContext context) throws JsonParseException {
+ public SpanBatchDeleteInformation deserialize(JsonElement element, Type typeOfT, JsonDeserializationContext context)
+ throws JsonParseException {
JsonObject json = element.getAsJsonObject();
int batchSize = json.get("batchSize").getAsInt();
String[] colArray = json.get("column").getAsString().split("\u0000");
@@ -49,7 +53,12 @@ public class SpanBatchInformationTypeAdapter implements JsonSerializer<SpanBatch
boolean startInc = json.get("startInc").getAsBoolean();
boolean endInc = json.get("endInc").getAsBoolean();
Span span = new Span(new RowColumn(rows[0]), startInc, new RowColumn(rows[1]), endInc);
- return SpanBatchDeleteInformation.builder().setBatchSize(batchSize).setSpan(span).setColumn(column).build();
+ String nodeId = json.get("nodeId").getAsString();
+ Optional<String> id = Optional.empty();
+ if (!nodeId.isEmpty()) {
+ id = Optional.of(nodeId);
+ }
+ return SpanBatchDeleteInformation.builder().setNodeId(id).setBatchSize(batchSize).setSpan(span).setColumn(column).build();
}
@Override
@@ -63,6 +72,8 @@ public class SpanBatchInformationTypeAdapter implements JsonSerializer<SpanBatch
result.add("span", new JsonPrimitive(span.getStart().getsRow() + "\u0000" + span.getEnd().getsRow()));
result.add("startInc", new JsonPrimitive(span.isStartInclusive()));
result.add("endInc", new JsonPrimitive(span.isEndInclusive()));
+ String nodeId = batch.getNodeId().orElse("");
+ result.add("nodeId", new JsonPrimitive(nodeId));
return result;
}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaSubGraphExporter.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaSubGraphExporter.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaSubGraphExporter.java
index e33ea97..20a6b97 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaSubGraphExporter.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaSubGraphExporter.java
@@ -19,6 +19,7 @@
package org.apache.rya.indexing.pcj.fluo.app.export.rya;
import static com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.rya.indexing.pcj.fluo.app.util.TriplePrefixUtils.addTriplePrefixAndConvertToBytes;
import java.util.Collection;
import java.util.Map;
@@ -54,11 +55,11 @@ public class RyaSubGraphExporter implements IncrementalRyaSubGraphExporter {
private static final Logger log = Logger.getLogger(RyaSubGraphExporter.class);
private static final WholeRowTripleResolver TRIPLE_RESOLVER = new WholeRowTripleResolver();
private final FluoClient fluo;
-
+
public RyaSubGraphExporter(FluoClient fluo) {
this.fluo = Preconditions.checkNotNull(fluo);
}
-
+
@Override
public Set<QueryType> getQueryTypes() {
return Sets.newHashSet(QueryType.CONSTRUCT);
@@ -78,12 +79,12 @@ public class RyaSubGraphExporter implements IncrementalRyaSubGraphExporter {
public void export(String constructID, RyaSubGraph subgraph) throws ResultExportException {
insertTriples(fluo.newTransaction(), subgraph.getStatements());
}
-
+
private void insertTriples(TransactionBase tx, final Collection<RyaStatement> triples) {
for (final RyaStatement triple : triples) {
Optional<byte[]> visibility = Optional.fromNullable(triple.getColumnVisibility());
try {
- tx.set(Bytes.of(spoFormat(triple)), FluoQueryColumns.TRIPLES, Bytes.of(visibility.or(new byte[0])));
+ tx.set(spoFormat(triple), FluoQueryColumns.TRIPLES, Bytes.of(visibility.or(new byte[0])));
} catch (final TripleRowResolverException e) {
log.error("Could not convert a Triple into the SPO format: " + triple);
}
@@ -97,10 +98,10 @@ public class RyaSubGraphExporter implements IncrementalRyaSubGraphExporter {
* @return The Rya SPO representation of the triple.
* @throws TripleRowResolverException The triple could not be converted.
*/
- private static byte[] spoFormat(final RyaStatement triple) throws TripleRowResolverException {
+ private static Bytes spoFormat(final RyaStatement triple) throws TripleRowResolverException {
checkNotNull(triple);
final Map<TABLE_LAYOUT, TripleRow> serialized = TRIPLE_RESOLVER.serialize(triple);
final TripleRow spoRow = serialized.get(TABLE_LAYOUT.SPO);
- return spoRow.getRow();
+ return addTriplePrefixAndConvertToBytes(spoRow.getRow());
}
}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/AggregationObserver.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/AggregationObserver.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/AggregationObserver.java
index 6147fa8..a8c4d58 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/AggregationObserver.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/AggregationObserver.java
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -19,6 +19,7 @@
package org.apache.rya.indexing.pcj.fluo.app.observers;
import static java.util.Objects.requireNonNull;
+import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.AGGREGATION_PREFIX;
import org.apache.fluo.api.client.TransactionBase;
import org.apache.fluo.api.data.Bytes;
@@ -54,8 +55,8 @@ public class AggregationObserver extends BindingSetUpdater {
requireNonNull(tx);
requireNonNull(row);
- // Fetch the Aggregation node's metadata.
- final String nodeId = BindingSetRow.make(row).getNodeId();
+ // Make nodeId and fetch the Aggregation node's metadata.
+ final String nodeId = BindingSetRow.makeFromShardedRow(Bytes.of(AGGREGATION_PREFIX), row).getNodeId();
final AggregationMetadata metadata = queryDao.readAggregationMetadata(tx, nodeId);
// Read the Visibility Binding Set from the value.
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/ConstructQueryResultObserver.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/ConstructQueryResultObserver.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/ConstructQueryResultObserver.java
index 09d9ede..01c9d73 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/ConstructQueryResultObserver.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/ConstructQueryResultObserver.java
@@ -18,6 +18,8 @@
*/
package org.apache.rya.indexing.pcj.fluo.app.observers;
+import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.CONSTRUCT_PREFIX;
+
import org.apache.fluo.api.client.TransactionBase;
import org.apache.fluo.api.data.Bytes;
import org.apache.fluo.api.data.Column;
@@ -25,12 +27,12 @@ import org.apache.fluo.api.observer.AbstractObserver;
import org.apache.log4j.Logger;
import org.apache.rya.api.domain.RyaStatement;
import org.apache.rya.indexing.pcj.fluo.app.BindingSetRow;
-import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants;
import org.apache.rya.indexing.pcj.fluo.app.NodeType;
import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalRyaSubGraphExporter;
import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataCache;
import org.apache.rya.indexing.pcj.fluo.app.query.MetadataCacheSupplier;
+import org.apache.rya.indexing.pcj.fluo.app.util.BindingHashShardingFunction;
/**
* Monitors the Column {@link FluoQueryColumns#CONSTRUCT_STATEMENTS} for new
@@ -42,7 +44,7 @@ import org.apache.rya.indexing.pcj.fluo.app.query.MetadataCacheSupplier;
public class ConstructQueryResultObserver extends AbstractObserver {
private static final Logger log = Logger.getLogger(ConstructQueryResultObserver.class);
- protected final FluoQueryMetadataCache queryDao = MetadataCacheSupplier.getOrCreateCache();
+ private final FluoQueryMetadataCache queryDao = MetadataCacheSupplier.getOrCreateCache();
@Override
public ObservedColumn getObservedColumn() {
@@ -53,18 +55,18 @@ public class ConstructQueryResultObserver extends AbstractObserver {
public void process(TransactionBase tx, Bytes row, Column col) throws Exception {
//Build row for parent that result will be written to
- BindingSetRow bsRow = BindingSetRow.make(row);
+ BindingSetRow bsRow = BindingSetRow.makeFromShardedRow(Bytes.of(CONSTRUCT_PREFIX), row);
String constructNodeId = bsRow.getNodeId();
String bsString= bsRow.getBindingSetString();
String parentNodeId = queryDao.readMetadadataEntry(tx, constructNodeId, FluoQueryColumns.CONSTRUCT_PARENT_NODE_ID).toString();
- String rowString = parentNodeId + IncrementalUpdateConstants.NODEID_BS_DELIM + bsString;
+ Bytes rowBytes = BindingHashShardingFunction.getShardedScanPrefix(parentNodeId, bsString);
//Get NodeType of the parent node
NodeType parentType = NodeType.fromNodeId(parentNodeId).get();
//Get data for the ConstructQuery result
Bytes bytes = tx.get(row, col);
//Write result to parent
- tx.set(Bytes.of(rowString), parentType.getResultColumn(), bytes);
+ tx.set(rowBytes, parentType.getResultColumn(), bytes);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/FilterObserver.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/FilterObserver.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/FilterObserver.java
index b4edfea..844343c 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/FilterObserver.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/FilterObserver.java
@@ -19,6 +19,7 @@
package org.apache.rya.indexing.pcj.fluo.app.observers;
import static java.util.Objects.requireNonNull;
+import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.FILTER_PREFIX;
import org.apache.fluo.api.client.TransactionBase;
import org.apache.fluo.api.data.Bytes;
@@ -49,7 +50,7 @@ public class FilterObserver extends BindingSetUpdater {
requireNonNull(row);
// Read the Filter metadata.
- final String filterNodeId = BindingSetRow.make(row).getNodeId();
+ final String filterNodeId = BindingSetRow.makeFromShardedRow(Bytes.of(FILTER_PREFIX), row).getNodeId();
final FilterMetadata filterMetadata = queryDao.readFilterMetadata(tx, filterNodeId);
// Read the Visibility Binding Set from the value.
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/JoinObserver.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/JoinObserver.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/JoinObserver.java
index c56a98f..f3f409e 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/JoinObserver.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/JoinObserver.java
@@ -19,6 +19,7 @@
package org.apache.rya.indexing.pcj.fluo.app.observers;
import static java.util.Objects.requireNonNull;
+import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.JOIN_PREFIX;
import org.apache.fluo.api.client.TransactionBase;
import org.apache.fluo.api.data.Bytes;
@@ -49,7 +50,7 @@ public class JoinObserver extends BindingSetUpdater {
requireNonNull(row);
// Read the Join metadata.
- final String joinNodeId = BindingSetRow.make(row).getNodeId();
+ final String joinNodeId = BindingSetRow.makeFromShardedRow(Bytes.of(JOIN_PREFIX), row).getNodeId();
final JoinMetadata joinMetadata = queryDao.readJoinMetadata(tx, joinNodeId);
// Read the Visibility Binding Set from the value.
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/PeriodicQueryObserver.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/PeriodicQueryObserver.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/PeriodicQueryObserver.java
index 7d96baa..87d0ca2 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/PeriodicQueryObserver.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/PeriodicQueryObserver.java
@@ -19,6 +19,7 @@
package org.apache.rya.indexing.pcj.fluo.app.observers;
import static java.util.Objects.requireNonNull;
+import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.PERIODIC_QUERY_PREFIX;
import org.apache.fluo.api.client.TransactionBase;
import org.apache.fluo.api.data.Bytes;
@@ -53,7 +54,7 @@ public class PeriodicQueryObserver extends BindingSetUpdater {
requireNonNull(row);
// Read the Join metadata.
- final String periodicBinNodeId = BindingSetRow.make(row).getNodeId();
+ final String periodicBinNodeId = BindingSetRow.makeFromShardedRow(Bytes.of(PERIODIC_QUERY_PREFIX), row).getNodeId();
final PeriodicQueryMetadata periodicBinMetadata = queryDao.readPeriodicQueryMetadata(tx, periodicBinNodeId);
// Read the Visibility Binding Set from the Value.
@@ -65,6 +66,6 @@ public class PeriodicQueryObserver extends BindingSetUpdater {
return new Observation(periodicBinNodeId, periodicBinBindingSet, parentNodeId);
}
-
+
}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/ProjectionObserver.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/ProjectionObserver.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/ProjectionObserver.java
index 5d73b2e..b77bf91 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/ProjectionObserver.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/ProjectionObserver.java
@@ -19,6 +19,7 @@
package org.apache.rya.indexing.pcj.fluo.app.observers;
import static java.util.Objects.requireNonNull;
+import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.PROJECTION_PREFIX;
import org.apache.fluo.api.client.TransactionBase;
import org.apache.fluo.api.data.Bytes;
@@ -48,7 +49,7 @@ public class ProjectionObserver extends BindingSetUpdater {
requireNonNull(row);
// Read the Filter metadata.
- final String projectionNodeId = BindingSetRow.make(row).getNodeId();
+ final String projectionNodeId = BindingSetRow.makeFromShardedRow(Bytes.of(PROJECTION_PREFIX), row).getNodeId();
final ProjectionMetadata projectionMetadata = queryDao.readProjectionMetadata(tx, projectionNodeId);
// Read the Visibility Binding Set from the value.
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java
index 78d0ec5..7fa4d38 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java
@@ -18,13 +18,14 @@
*/
package org.apache.rya.indexing.pcj.fluo.app.observers;
-import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.NODEID_BS_DELIM;
+import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.QUERY_PREFIX;
import static org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns.QUERY_BINDING_SET;
import org.apache.fluo.api.client.TransactionBase;
import org.apache.fluo.api.data.Bytes;
import org.apache.fluo.api.data.Column;
import org.apache.fluo.api.observer.AbstractObserver;
+import org.apache.rya.indexing.pcj.fluo.app.BindingSetRow;
import org.apache.rya.indexing.pcj.fluo.app.export.ExporterManager;
import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporter;
import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporter;
@@ -50,7 +51,7 @@ import com.google.common.collect.ImmutableSet;
public class QueryResultObserver extends AbstractObserver {
private static final Logger log = LoggerFactory.getLogger(QueryResultObserver.class);
- protected final FluoQueryMetadataCache queryDao = MetadataCacheSupplier.getOrCreateCache();
+ private final FluoQueryMetadataCache queryDao = MetadataCacheSupplier.getOrCreateCache();
/**
* Builders for each type of {@link IncrementalBindingSetExporter} we support.
*/
@@ -97,10 +98,9 @@ public class QueryResultObserver extends AbstractObserver {
@Override
public void process(final TransactionBase tx, final Bytes brow, final Column col) throws Exception {
- final String row = brow.toString();
// Read the queryId from the row and get the QueryMetadata.
- final String queryId = row.split(NODEID_BS_DELIM)[0];
+ final String queryId = BindingSetRow.makeFromShardedRow(Bytes.of(QUERY_PREFIX), brow).getNodeId();
final QueryMetadata metadata = queryDao.readQueryMetadata(tx, queryId);
// Read the Child Binding Set that will be exported.
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/StatementPatternObserver.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/StatementPatternObserver.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/StatementPatternObserver.java
index 607267a..e3c5b95 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/StatementPatternObserver.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/StatementPatternObserver.java
@@ -19,6 +19,7 @@
package org.apache.rya.indexing.pcj.fluo.app.observers;
import static java.util.Objects.requireNonNull;
+import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.SP_PREFIX;
import org.apache.fluo.api.client.TransactionBase;
import org.apache.fluo.api.data.Bytes;
@@ -48,8 +49,8 @@ public class StatementPatternObserver extends BindingSetUpdater {
requireNonNull(tx);
requireNonNull(row);
- // Read the Statement Pattern metadata.
- final String spNodeId = BindingSetRow.make(row).getNodeId();
+ // Make nodeId and get the Statement Pattern metadata.
+ final String spNodeId = BindingSetRow.makeFromShardedRow(Bytes.of(SP_PREFIX), row).getNodeId();
final StatementPatternMetadata spMetadata = queryDao.readStatementPatternMetadata(tx, spNodeId);
// Read the Visibility Binding Set from the value.
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/TripleObserver.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/TripleObserver.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/TripleObserver.java
index d6fd8bd..83517bd 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/TripleObserver.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/TripleObserver.java
@@ -19,24 +19,23 @@
package org.apache.rya.indexing.pcj.fluo.app.observers;
import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.DELIM;
-import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.NODEID_BS_DELIM;
-import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.SP_PREFIX;
import java.util.Map;
+import java.util.Set;
import org.apache.fluo.api.client.TransactionBase;
-import org.apache.fluo.api.client.scanner.ColumnScanner;
-import org.apache.fluo.api.client.scanner.RowScanner;
import org.apache.fluo.api.data.Bytes;
import org.apache.fluo.api.data.Column;
-import org.apache.fluo.api.data.Span;
import org.apache.fluo.api.observer.AbstractObserver;
import org.apache.rya.api.domain.RyaStatement;
import org.apache.rya.indexing.pcj.fluo.app.IncUpdateDAO;
import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataCache;
import org.apache.rya.indexing.pcj.fluo.app.query.MetadataCacheSupplier;
+import org.apache.rya.indexing.pcj.fluo.app.query.StatementPatternIdCache;
+import org.apache.rya.indexing.pcj.fluo.app.query.StatementPatternIdCacheSupplier;
import org.apache.rya.indexing.pcj.fluo.app.query.StatementPatternMetadata;
+import org.apache.rya.indexing.pcj.fluo.app.util.BindingHashShardingFunction;
import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSetSerDe;
@@ -44,7 +43,6 @@ import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSetStringCo
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.base.Charsets;
import com.google.common.collect.Maps;
/**
@@ -56,11 +54,10 @@ public class TripleObserver extends AbstractObserver {
private static final Logger log = LoggerFactory.getLogger(TripleObserver.class);
private static final VisibilityBindingSetSerDe BS_SERDE = new VisibilityBindingSetSerDe();
- private static final FluoQueryMetadataCache QUERY_METADATA_DAO = MetadataCacheSupplier.getOrCreateCache();
+ private final FluoQueryMetadataCache QUERY_METADATA_DAO = MetadataCacheSupplier.getOrCreateCache();
+ private final StatementPatternIdCache SP_ID_CACHE = StatementPatternIdCacheSupplier.getOrCreateCache();
private static final VisibilityBindingSetStringConverter VIS_BS_CONVERTER = new VisibilityBindingSetStringConverter();
- public TripleObserver() {}
-
@Override
public ObservedColumn getObservedColumn() {
return new ObservedColumn(FluoQueryColumns.TRIPLES, NotificationType.STRONG);
@@ -71,53 +68,45 @@ public class TripleObserver extends AbstractObserver {
// Get string representation of triple.
final RyaStatement ryaStatement = IncUpdateDAO.deserializeTriple(brow);
log.trace("Transaction ID: {}\nRya Statement: {}\n", tx.getStartTimestamp(), ryaStatement);
+ log.trace("Beginging to process triple.");
final String triple = IncUpdateDAO.getTripleString(ryaStatement);
- // Iterate over each of the Statement Patterns that are being matched against.
- final RowScanner spScanner = tx.scanner()
- .over(Span.prefix(SP_PREFIX))
-
- // Only fetch rows that have the pattern in them. There will only be a single row with a pattern per SP.
- .fetch(FluoQueryColumns.STATEMENT_PATTERN_PATTERN)
- .byRow()
- .build();
+ Set<String> spIDs = SP_ID_CACHE.getStatementPatternIds(tx);
//see if triple matches conditions of any of the SP
- for (final ColumnScanner colScanner : spScanner) {
- // Get the Statement Pattern's node id.
- final String spID = colScanner.getsRow();
-
+ for (String spID: spIDs) {
// Fetch its metadata.
final StatementPatternMetadata spMetadata = QUERY_METADATA_DAO.readStatementPatternMetadata(tx, spID);
+ log.trace("Retrieved metadata: {}", spMetadata);
+
// Attempt to match the triple against the pattern.
final String pattern = spMetadata.getStatementPattern();
final VariableOrder varOrder = spMetadata.getVariableOrder();
final String bindingSetString = getBindingSet(triple, pattern, varOrder);
+ log.trace("Created binding set match string: {}", bindingSetString);
+
// Statement matches to a binding set.
if(bindingSetString.length() != 0) {
// Fetch the triple's visibility label.
final String visibility = tx.gets(brow.toString(), FluoQueryColumns.TRIPLES, "");
- // Create the Row ID for the emitted binding set. It does not contain visibilities.
- final String row = spID + NODEID_BS_DELIM + bindingSetString;
- final Bytes rowBytes = Bytes.of( row.getBytes(Charsets.UTF_8) );
+ //Make BindingSet and sharded row
+ final VisibilityBindingSet visBindingSet = VIS_BS_CONVERTER.convert(bindingSetString, varOrder);
+ visBindingSet.setVisibility(visibility);
+ Bytes row = BindingHashShardingFunction.addShard(spID, varOrder, visBindingSet);
// If this is a new Binding Set, then emit it.
- if(tx.get(rowBytes, FluoQueryColumns.STATEMENT_PATTERN_BINDING_SET) == null) {
- // Create the Binding Set that goes in the Node Value. It does contain visibilities.
- final VisibilityBindingSet visBindingSet = VIS_BS_CONVERTER.convert(bindingSetString, varOrder);
- visBindingSet.setVisibility(visibility);
-
+ if(tx.get(row, FluoQueryColumns.STATEMENT_PATTERN_BINDING_SET) == null) {
try {
final Bytes valueBytes = BS_SERDE.serialize(visBindingSet);
log.trace("Transaction ID: {}\nMatched Statement Pattern: {}\nBinding Set: {}\n",
tx.getStartTimestamp(), spID, visBindingSet);
- tx.set(rowBytes, FluoQueryColumns.STATEMENT_PATTERN_BINDING_SET, valueBytes);
+ tx.set(row, FluoQueryColumns.STATEMENT_PATTERN_BINDING_SET, valueBytes);
} catch(final Exception e) {
log.error("Couldn't serialize a Binding Set. This value will be skipped.", e);
}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryColumns.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryColumns.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryColumns.java
index 6ca0e8d..c30843d 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryColumns.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryColumns.java
@@ -53,7 +53,7 @@ import edu.umd.cs.findbugs.annotations.NonNull;
* <table border="1" style="width:100%">
* <tr> <th>Fluo Row</td> <th>Fluo Column</td> <th>Fluo Value</td> </tr>
* <tr> <td>Node ID</td> <td>projectionMetadata:nodeId</td> <td>The Node ID of the Query.</td> </tr>
- * <tr> <td>Node ID</td> <td>projectionMetadata:projectedVars</td> <td>The variables that results are projected onto.</td> </tr>*
+ * <tr> <td>Node ID</td> <td>projectionMetadata:projectedVars</td> <td>The variables that results are projected onto.</td> </tr>*
* <tr> <td>Node ID</td> <td>projectionMetadata:variableOrder</td> <td>The Variable Order that Binding values are written in in the Row to identify solutions.</td> </tr>
* <tr> <td>Node ID</td> <td>projectionMetadata:childNodeId</td> <td>The Node ID of the child who feeds this node.</td> </tr>
* <tr> <td>Node ID</td> <td>projectionMetadata:parentNodeId</td> <td>The Node ID of the parent of this node.</td> </tr>
@@ -109,7 +109,7 @@ import edu.umd.cs.findbugs.annotations.NonNull;
* <tr> <td>Node ID</td> <td>joinMetadata:parentNodeId</td> <td>The Node ID this join emits Binding Sets to.</td> </tr>
* <tr> <td>Node ID</td> <td>joinMetadata:leftChildNodeId</td> <td>A Node ID of the node that feeds this node Binding Sets.</td> </tr>
* <tr> <td>Node ID</td> <td>joinMetadata:rightChildNodeId</td> <td>A Node ID of the node that feeds this node Binding Sets.</td> </tr>
- * <tr> <td>Node ID</td> <td>joinMetadata:joinBatchSize</td> <td>Batch size used for processing joins</td> </tr>
+ * <tr> <td>Node ID</td> <td>joinMetadata:joinBatchSize</td> <td>Batch size used for processing joins</td> </tr>
* <tr> <td>Node ID + DELIM + Binding Set String</td> <td>joinMetadata:bindingSet</td> <td>A {@link VisibilityBindingSet} object.</td> </tr>
* </table>
* </p>
@@ -171,7 +171,7 @@ public class FluoQueryColumns {
public static final Column QUERY_BINDING_SET = new Column(QUERY_METADATA_CF, "bindingSet");
public static final Column QUERY_EXPORT_STRATEGIES = new Column(QUERY_METADATA_CF, "exportStrategies");
public static final Column QUERY_TYPE = new Column(QUERY_METADATA_CF, "queryType");
-
+
// Query Metadata columns.
public static final Column PROJECTION_NODE_ID = new Column(PROJECTION_METADATA_CF, "nodeId");
public static final Column PROJECTION_PROJECTED_VARS = new Column(PROJECTION_METADATA_CF, "projectedVars");
@@ -195,7 +195,7 @@ public class FluoQueryColumns {
public static final Column FILTER_PARENT_NODE_ID = new Column(FILTER_METADATA_CF, "parentNodeId");
public static final Column FILTER_CHILD_NODE_ID = new Column(FILTER_METADATA_CF, "childNodeId");
public static final Column FILTER_BINDING_SET = new Column(FILTER_METADATA_CF, "bindingSet");
-
+
// Periodic Bin Metadata columns.
public static final Column PERIODIC_QUERY_NODE_ID = new Column(PERIODIC_QUERY_METADATA_CF, "nodeId");
public static final Column PERIODIC_QUERY_VARIABLE_ORDER = new Column(PERIODIC_QUERY_METADATA_CF, "variableOrder");
@@ -206,7 +206,7 @@ public class FluoQueryColumns {
public static final Column PERIODIC_QUERY_WINDOWSIZE = new Column(PERIODIC_QUERY_METADATA_CF, "windowSize");
public static final Column PERIODIC_QUERY_TIMEUNIT = new Column(PERIODIC_QUERY_METADATA_CF, "timeUnit");
public static final Column PERIODIC_QUERY_TEMPORAL_VARIABLE = new Column(PERIODIC_QUERY_METADATA_CF, "temporalVariable");
-
+
// Join Metadata columns.
public static final Column JOIN_NODE_ID = new Column(JOIN_METADATA_CF, "nodeId");
public static final Column JOIN_VARIABLE_ORDER = new Column(JOIN_METADATA_CF, "variableOrder");
@@ -246,6 +246,13 @@ public class FluoQueryColumns {
public static final Column BATCH_COLUMN = new Column("batch","information");
/**
+ * Column indicating a set of all StatementPattern ids in the Fluo table. This is used
+ * by the Triple of Observer for finding new queries to match incoming triple to.
+ */
+ public static final Column STATEMENT_PATTERN_IDS = new Column("statementPattern", "ids");
+ public static final Column STATEMENT_PATTERN_IDS_HASH = new Column("statementPattern", "hash");
+
+ /**
* Enumerates the {@link Column}s that hold all of the fields for each type
* of node that can compose a query.
*/
@@ -261,7 +268,7 @@ public class FluoQueryColumns {
QUERY_TYPE,
QUERY_EXPORT_STRATEGIES,
QUERY_CHILD_NODE_ID)),
-
+
/**
* The columns a {@link ProjectionMetadata} object's fields are stored within.
*/
@@ -271,8 +278,8 @@ public class FluoQueryColumns {
PROJECTION_VARIABLE_ORDER,
PROJECTION_PARENT_NODE_ID,
PROJECTION_CHILD_NODE_ID)),
-
-
+
+
/**
* The columns a {@link PeriodicBinMetadata} object's fields are stored within.
*/
@@ -297,7 +304,7 @@ public class FluoQueryColumns {
CONSTRUCT_PARENT_NODE_ID,
CONSTRUCT_STATEMENTS)),
-
+
/**
* The columns a {@link FilterMetadata} object's fields are stored within.
*/
@@ -317,7 +324,7 @@ public class FluoQueryColumns {
JOIN_TYPE,
JOIN_PARENT_NODE_ID,
JOIN_LEFT_CHILD_NODE_ID,
- JOIN_BATCH_SIZE,
+ JOIN_BATCH_SIZE,
JOIN_RIGHT_CHILD_NODE_ID)),
/**
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataCache.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataCache.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataCache.java
index 8adc40d..b1b4076 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataCache.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataCache.java
@@ -18,8 +18,7 @@
*/package org.apache.rya.indexing.pcj.fluo.app.query;
import static com.google.common.base.Preconditions.checkArgument;
-
-import java.util.concurrent.Callable;
+import static com.google.common.base.Preconditions.checkNotNull;
import org.apache.fluo.api.client.SnapshotBase;
import org.apache.fluo.api.data.Bytes;
@@ -32,15 +31,18 @@ import com.google.common.base.Optional;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
+
/**
* Wrapper for {@link FluoQueryMetadataDAO} that caches any metadata that has been retrieved from Fluo. This class first
* checks the cache to see if the metadata is present before delegating to the underlying DAO method to retrieve the
- * data.
+ * data. The cache has a fixed capacity (determined at construction time), and evicts the least recently used entries
+ * when space is needed.
*
*/
public class FluoQueryMetadataCache extends FluoQueryMetadataDAO {
private static final Logger LOG = LoggerFactory.getLogger(FluoQueryMetadataCache.class);
+
private final FluoQueryMetadataDAO dao;
private final Cache<String, CommonNodeMetadata> commonNodeMetadataCache;
private final Cache<String, Bytes> metadataCache;
@@ -49,10 +51,15 @@ public class FluoQueryMetadataCache extends FluoQueryMetadataDAO {
/**
* Creates a FluoQueryMetadataCache with the specified capacity. Old, unused results are evicted as necessary.
- *
* @param capacity - max size of the cache
+ * @param concurrencyLevel - indicates how the cache will be partitioned to that different threads can access those
+ * partitions in a non-serialized manner
+ * @throws IllegalArgumentException if dao is null, capacity <= 0, or concurrencyLevel <= 0
*/
public FluoQueryMetadataCache(FluoQueryMetadataDAO dao, int capacity, int concurrencyLevel) {
+ checkNotNull(dao);
+ checkArgument(capacity > 0);
+ checkArgument(concurrencyLevel > 0);
this.dao = dao;
commonNodeMetadataCache = CacheBuilder.newBuilder().concurrencyLevel(concurrencyLevel).maximumSize(capacity).build();
metadataCache = CacheBuilder.newBuilder().concurrencyLevel(concurrencyLevel).maximumSize(capacity).build();
@@ -75,166 +82,204 @@ public class FluoQueryMetadataCache extends FluoQueryMetadataDAO {
return concurrencyLevel;
}
+
+ /**
+ * @throws IllegalArgumentException if tx or nodeId is null, and if {@link NodeType#fromNodeId(String)}
+ * does not return an Optional containing {@link NodeType#STATEMENT_PATTERN}.
+ */
@Override
public StatementPatternMetadata readStatementPatternMetadata(SnapshotBase tx, String nodeId) {
+ checkNotNull(nodeId);
+ checkNotNull(tx);
Optional<NodeType> type = NodeType.fromNodeId(nodeId);
-
+ checkArgument(type.isPresent() && type.get() == NodeType.STATEMENT_PATTERN);
try {
- checkArgument(type.isPresent() && type.get() == NodeType.STATEMENT_PATTERN);
LOG.debug("Retrieving Metadata from Cache: {}", nodeId);
- return (StatementPatternMetadata) commonNodeMetadataCache.get(nodeId, new Callable<CommonNodeMetadata>() {
- @Override
- public CommonNodeMetadata call() throws Exception {
- LOG.debug("Seeking Metadata from Fluo Table: {}", nodeId);
- return dao.readStatementPatternMetadata(tx, nodeId);
- }
+ return (StatementPatternMetadata) commonNodeMetadataCache.get(nodeId, () -> {
+ LOG.debug("Seeking Metadata from Fluo Table: {}", nodeId);
+ return dao.readStatementPatternMetadata(tx, nodeId);
});
} catch (Exception e) {
throw new RuntimeException("Unable to access StatementPatternMetadata for nodeId: " + nodeId, e);
}
}
+ /**
+ * @throws IllegalArgumentException if tx or nodeId is null, and if {@link NodeType#fromNodeId(String)}
+ * does not return an Optional containing {@link NodeType#JOIN}.
+ */
@Override
public JoinMetadata readJoinMetadata(SnapshotBase tx, String nodeId) {
+ checkNotNull(nodeId);
+ checkNotNull(tx);
Optional<NodeType> type = NodeType.fromNodeId(nodeId);
+ checkArgument(type.isPresent() && type.get() == NodeType.JOIN);
try {
- checkArgument(type.isPresent() && type.get() == NodeType.JOIN);
LOG.debug("Retrieving Metadata from Cache: {}.", nodeId);
- return (JoinMetadata) commonNodeMetadataCache.get(nodeId, new Callable<CommonNodeMetadata>() {
- @Override
- public CommonNodeMetadata call() throws Exception {
- LOG.debug("Seeking Metadata from Fluo Table: {}.", nodeId);
- return dao.readJoinMetadata(tx, nodeId);
- }
+ return (JoinMetadata) commonNodeMetadataCache.get(nodeId, () -> {
+ LOG.debug("Seeking Metadata from Fluo Table: {}.", nodeId);
+ return dao.readJoinMetadata(tx, nodeId);
});
} catch (Exception e) {
throw new RuntimeException("Unable to access JoinMetadata for nodeId: " + nodeId, e);
}
}
+ /**
+ * @throws IllegalArgumentException if tx or nodeId is null, and if {@link NodeType#fromNodeId(String)}
+ * does not return an Optional containing {@link NodeType#FILTER}.
+ */
@Override
public FilterMetadata readFilterMetadata(SnapshotBase tx, String nodeId) {
+ checkNotNull(nodeId);
+ checkNotNull(tx);
Optional<NodeType> type = NodeType.fromNodeId(nodeId);
+ checkArgument(type.isPresent() && type.get() == NodeType.FILTER);
try {
- checkArgument(type.isPresent() && type.get() == NodeType.FILTER);
LOG.debug("Retrieving Metadata from Cache: {}", nodeId);
- return (FilterMetadata) commonNodeMetadataCache.get(nodeId, new Callable<CommonNodeMetadata>() {
- @Override
- public CommonNodeMetadata call() throws Exception {
- LOG.debug("Seeking Metadata from Fluo Table: {}.", nodeId);
- return dao.readFilterMetadata(tx, nodeId);
- }
+ return (FilterMetadata) commonNodeMetadataCache.get(nodeId, () -> {
+ LOG.debug("Seeking Metadata from Fluo Table: {}.", nodeId);
+ return dao.readFilterMetadata(tx, nodeId);
});
} catch (Exception e) {
throw new RuntimeException("Unable to access FilterMetadata for nodeId: " + nodeId, e);
}
}
+ /**
+ * @throws IllegalArgumentException if tx or nodeId is null, and if {@link NodeType#fromNodeId(String)}
+ * does not return an Optional containing {@link NodeType#PROJECTION}.
+ */
@Override
public ProjectionMetadata readProjectionMetadata(SnapshotBase tx, String nodeId) {
+ checkNotNull(nodeId);
+ checkNotNull(tx);
Optional<NodeType> type = NodeType.fromNodeId(nodeId);
checkArgument(type.isPresent() && type.get() == NodeType.PROJECTION);
- LOG.debug("Retrieving Metadata from Cache: {}", nodeId);
try {
- return (ProjectionMetadata) commonNodeMetadataCache.get(nodeId, new Callable<CommonNodeMetadata>() {
- @Override
- public CommonNodeMetadata call() throws Exception {
- LOG.debug("Seeking Metadata from Fluo Table: {}.", nodeId);
- return dao.readProjectionMetadata(tx, nodeId);
- }
+ LOG.debug("Retrieving Metadata from Cache: {}", nodeId);
+ return (ProjectionMetadata) commonNodeMetadataCache.get(nodeId, () -> {
+ LOG.debug("Seeking Metadata from Fluo Table: {}.", nodeId);
+ return dao.readProjectionMetadata(tx, nodeId);
});
} catch (Exception e) {
throw new RuntimeException("Unable to access ProjectionMetadata for nodeId: " + nodeId, e);
}
}
+ /**
+ * @throws IllegalArgumentException if tx or nodeId is null, and if {@link NodeType#fromNodeId(String)}
+ * does not return an Optional containing {@link NodeType#AGGREGATION}.
+ */
@Override
public AggregationMetadata readAggregationMetadata(SnapshotBase tx, String nodeId) {
+ checkNotNull(nodeId);
+ checkNotNull(tx);
Optional<NodeType> type = NodeType.fromNodeId(nodeId);
+ checkArgument(type.isPresent() && type.get() == NodeType.AGGREGATION);
try {
- checkArgument(type.isPresent() && type.get() == NodeType.AGGREGATION);
LOG.debug("Retrieving Metadata from Cache: {}", nodeId);
- return (AggregationMetadata) commonNodeMetadataCache.get(nodeId, new Callable<CommonNodeMetadata>() {
- @Override
- public CommonNodeMetadata call() throws Exception {
- LOG.debug("Seeking Metadata from Fluo Table: {}.", nodeId);
- return dao.readAggregationMetadata(tx, nodeId);
- }
+ return (AggregationMetadata) commonNodeMetadataCache.get(nodeId, () -> {
+ LOG.debug("Seeking Metadata from Fluo Table: {}.", nodeId);
+ return dao.readAggregationMetadata(tx, nodeId);
});
} catch (Exception e) {
throw new RuntimeException("Unable to access AggregationMetadata for nodeId: " + nodeId, e);
}
}
+ /**
+ * @throws IllegalArgumentException if tx or nodeId is null, and if {@link NodeType#fromNodeId(String)}
+ * does not return an Optional containing {@link NodeType#CONSTRUCT}.
+ */
@Override
public ConstructQueryMetadata readConstructQueryMetadata(SnapshotBase tx, String nodeId) {
+ checkNotNull(nodeId);
+ checkNotNull(tx);
Optional<NodeType> type = NodeType.fromNodeId(nodeId);
+ checkArgument(type.isPresent() && type.get() == NodeType.CONSTRUCT);
try {
- checkArgument(type.isPresent() && type.get() == NodeType.CONSTRUCT);
LOG.debug("Retrieving Metadata from Cache: {}", nodeId);
- return (ConstructQueryMetadata) commonNodeMetadataCache.get(nodeId, new Callable<CommonNodeMetadata>() {
- @Override
- public CommonNodeMetadata call() throws Exception {
- LOG.debug("Seeking Metadata from Fluo Table: {}.", nodeId);
- return dao.readConstructQueryMetadata(tx, nodeId);
- }
+ return (ConstructQueryMetadata) commonNodeMetadataCache.get(nodeId, () -> {
+ LOG.debug("Seeking Metadata from Fluo Table: {}.", nodeId);
+ return dao.readConstructQueryMetadata(tx, nodeId);
});
} catch (Exception e) {
throw new RuntimeException("Unable to access ConstructQueryMetadata for nodeId: " + nodeId, e);
}
}
+ /**
+ * @throws IllegalArgumentException if tx or nodeId is null, and if {@link NodeType#fromNodeId(String)}
+ * does not return an Optional containing {@link NodeType#PERIODIC_QUERY}.
+ */
@Override
public PeriodicQueryMetadata readPeriodicQueryMetadata(SnapshotBase tx, String nodeId) {
+ checkNotNull(nodeId);
+ checkNotNull(tx);
Optional<NodeType> type = NodeType.fromNodeId(nodeId);
+ checkArgument(type.isPresent() && type.get() == NodeType.PERIODIC_QUERY);
try {
- checkArgument(type.isPresent() && type.get() == NodeType.PERIODIC_QUERY);
LOG.debug("Retrieving Metadata from Cache: {}", nodeId);
- return (PeriodicQueryMetadata) commonNodeMetadataCache.get(nodeId, new Callable<CommonNodeMetadata>() {
- @Override
- public CommonNodeMetadata call() throws Exception {
- LOG.debug("Seeking Metadata from Fluo Table: {}.", nodeId);
- return dao.readPeriodicQueryMetadata(tx, nodeId);
- }
+ return (PeriodicQueryMetadata) commonNodeMetadataCache.get(nodeId, () -> {
+ LOG.debug("Seeking Metadata from Fluo Table: {}.", nodeId);
+ return dao.readPeriodicQueryMetadata(tx, nodeId);
});
} catch (Exception e) {
throw new RuntimeException("Unable to access PeriodicQueryMetadata for nodeId: " + nodeId, e);
}
}
+ /**
+ * @throws IllegalArgumentException if tx or nodeId is null, and if {@link NodeType#fromNodeId(String)}
+ * does not return an Optional containing {@link NodeType#QUERY}.
+ */
@Override
public QueryMetadata readQueryMetadata(SnapshotBase tx, String nodeId) {
+ checkNotNull(nodeId);
+ checkNotNull(tx);
Optional<NodeType> type = NodeType.fromNodeId(nodeId);
+ checkArgument(type.isPresent() && type.get() == NodeType.QUERY);
try {
- checkArgument(type.isPresent() && type.get() == NodeType.QUERY);
LOG.debug("Retrieving Metadata from Cache: {}", nodeId);
- return (QueryMetadata) commonNodeMetadataCache.get(nodeId, new Callable<CommonNodeMetadata>() {
- @Override
- public CommonNodeMetadata call() throws Exception {
- LOG.debug("Seeking Metadata from Fluo Table: {}.", nodeId);
- return dao.readQueryMetadata(tx, nodeId);
- }
+ return (QueryMetadata) commonNodeMetadataCache.get(nodeId, () -> {
+ LOG.debug("Seeking Metadata from Fluo Table: {}.", nodeId);
+ return dao.readQueryMetadata(tx, nodeId);
});
} catch (Exception e) {
throw new RuntimeException("Unable to access QueryMetadata for nodeId: " + nodeId, e);
}
}
+ /**
+ * Reads specific metadata entries from the cache. This method will retrieve the entry
+ * from the Fluo table if it does not already exist in the cache.
+ * @param tx - Transaction for interacting with Fluo
+ * @param rowId - rowId for metadata entry
+ * @param column - column of metadata entry
+ * @return - value associated with the metadata entry
+ */
public Bytes readMetadadataEntry(SnapshotBase tx, String rowId, Column column) {
+ checkNotNull(rowId);
+ checkNotNull(tx);
+ checkNotNull(column);
Optional<NodeType> type = NodeType.fromNodeId(rowId);
+ checkArgument(type.isPresent() && type.get().getMetaDataColumns().contains(column));
try {
- checkArgument(type.isPresent() && type.get().getMetaDataColumns().contains(column));
- return metadataCache.get(getKey(rowId, column), new Callable<Bytes>() {
- @Override
- public Bytes call() throws Exception {
- return tx.get(Bytes.of(rowId), column);
- }
- });
+ return metadataCache.get(getKey(rowId, column), () -> tx.get(Bytes.of(rowId), column));
} catch (Exception e) {
throw new RuntimeException("Unable to access Metadata Entry with rowId: " + rowId + " and column: " + column, e);
}
}
+ /**
+ * Deletes contents of cache.
+ */
+ public void clear() {
+ commonNodeMetadataCache.asMap().clear();
+ metadataCache.asMap().clear();
+ }
+
private String getKey(String row, Column column) {
return row + ":" + column.getsQualifier() + ":" + column.getsQualifier();
}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java
index c132ad4..55e521e 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java
@@ -47,6 +47,7 @@ import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
import com.google.common.base.Charsets;
import com.google.common.base.Joiner;
import com.google.common.base.Optional;
+import com.google.common.collect.Sets;
import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
import edu.umd.cs.findbugs.annotations.NonNull;
@@ -617,15 +618,19 @@ public class FluoQueryMetadataDAO {
write(tx, join);
}
+ Set<String> ids = new HashSet<>();
for(final StatementPatternMetadata statementPattern : query.getStatementPatternMetadata()) {
write(tx, statementPattern);
+ ids.add(statementPattern.getNodeId());
}
+ StatementPatternIdManager.addStatementPatternIds(tx, Sets.newHashSet(ids));
for(final AggregationMetadata aggregation : query.getAggregationMetadata()) {
write(tx, aggregation);
}
}
+
/**
* Read an instance of {@link FluoQuery} from the Fluo table.
*
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/MetadataCacheSupplier.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/MetadataCacheSupplier.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/MetadataCacheSupplier.java
index faab952..761100d 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/MetadataCacheSupplier.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/MetadataCacheSupplier.java
@@ -1,8 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
package org.apache.rya.indexing.pcj.fluo.app.query;
+import java.util.concurrent.locks.ReentrantLock;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+/**
+ * Manages the creation of the {@link FluoQueryMetadataCache} in the Fluo application.
+ * This supplier enforces singleton like behavior in that it will only create the cache if it
+ * doesn't already exist. The FluoQueryMetadataCache is not a singleton in itself.
+ */
public class MetadataCacheSupplier {
private static final Logger LOG = LoggerFactory.getLogger(MetadataCacheSupplier.class);
@@ -10,6 +35,7 @@ public class MetadataCacheSupplier {
private static boolean initialized = false;
private static final int DEFAULT_CAPACITY = 10000;
private static final int DEFAULT_CONCURRENCY = 8;
+ private static final ReentrantLock lock = new ReentrantLock();
/**
* Returns an existing cache with the specified instance name, or creates a cache. The created cache will have the
@@ -19,21 +45,26 @@ public class MetadataCacheSupplier {
* @param concurrencyLevel - concurrencyLevel used to create a new cache
*/
public static FluoQueryMetadataCache getOrCreateCache(int capacity, int concurrencyLevel) {
- if (!initialized) {
- LOG.debug("Cache has not been initialized. Initializing cache with capacity: {} and concurrencylevel: {}", capacity,
- concurrencyLevel);
- CACHE = new FluoQueryMetadataCache(new FluoQueryMetadataDAO(), capacity, concurrencyLevel);
- initialized = true;
- } else {
- LOG.debug("Cache has already been initialized. Returning cache with capacity: {} and concurrencylevel: {}",
- CACHE.getCapacity(), CACHE.getConcurrencyLevel());
+ lock.lock();
+ try {
+ if (!initialized) {
+ LOG.debug("Cache has not been initialized. Initializing cache with capacity: {} and concurrencylevel: {}", capacity,
+ concurrencyLevel);
+ CACHE = new FluoQueryMetadataCache(new FluoQueryMetadataDAO(), capacity, concurrencyLevel);
+ initialized = true;
+ } else {
+ LOG.warn(
+ "A cache has already been initialized, so a cache with capacity: {} and concurrency level: {} will not be created. Returning existing cache with capacity: {} and concurrencylevel: {}",
+ capacity, concurrencyLevel, CACHE.getCapacity(), CACHE.getConcurrencyLevel());
+ }
+ return CACHE;
+ } finally {
+ lock.unlock();
}
- return CACHE;
}
/**
- * Returns cache with the name {@link FluoQueryMetadataCache#FLUO_CACHE_INSTANCE} if it exists, otherwise creates it
- * with a default size of 10000 entries and a default concurrency level of 8.
+ * Creates a FluoQueryMetadataCache with a default size of 10000 entries and a default concurrency level of 8.
*
* @return - FluoQueryMetadataCache with default instance name and default capacity and concurrency
*/
@@ -41,4 +72,21 @@ public class MetadataCacheSupplier {
return getOrCreateCache(DEFAULT_CAPACITY, DEFAULT_CONCURRENCY);
}
+ /**
+ * Clears contents of cache and makes supplier uninitialized so that it creates a new cache.
+ * This is useful for integration tests.
+ */
+ public static void clear() {
+ lock.lock();
+ try{
+ if(initialized) {
+ CACHE.clear();
+ CACHE = null;
+ initialized = false;
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/StatementPatternIdCache.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/StatementPatternIdCache.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/StatementPatternIdCache.java
new file mode 100644
index 0000000..f1ddb02
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/StatementPatternIdCache.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.query;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.STATEMENT_PATTERN_ID;
+import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.VAR_DELIM;
+import static org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns.STATEMENT_PATTERN_IDS;
+import static org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns.STATEMENT_PATTERN_IDS_HASH;
+
+import java.util.HashSet;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.fluo.api.client.TransactionBase;
+import org.apache.fluo.api.data.Bytes;
+
+import com.google.common.collect.Sets;
+
+/**
+ * This class caches the StatementPattern Ids so they don't have
+ * to be looked up each time a new Statement needs to be processed
+ * in the TripleObserver.
+ *
+ */
+public class StatementPatternIdCache {
+
+ private final ReentrantLock lock = new ReentrantLock();
+ private static Optional<String> HASH = Optional.empty();
+ private static Set<String> IDS = new HashSet<>();
+
+ /**
+ * This method retrieves the StatementPattern NodeIds registered in the Fluo table.
+ * To determine whether the StatementPattern NodeIds have changed in the underlying Fluo table,
+ * this class maintains a local hash of the ids. When this method is called, it looks up the
+ * hash of the StatementPattern Id Strings in the Fluo table, and only if it is different
+ * than the local hash will the StatementPattern nodeIds be retrieved from the Fluo table. Otherwise,
+ * this method returns a local cache of the StatementPattern nodeIds. This method is thread safe.
+ * @param tx
+ * @return - Set of StatementPattern nodeIds
+ */
+ public Set<String> getStatementPatternIds(TransactionBase tx) {
+ checkNotNull(tx);
+ Optional<Bytes> hashBytes = Optional.ofNullable(tx.get(Bytes.of(STATEMENT_PATTERN_ID), STATEMENT_PATTERN_IDS_HASH));
+ if (hashBytes.isPresent()) {
+ String hash = hashBytes.get().toString();
+ if ((HASH.isPresent() && HASH.get().equals(hash))) {
+ return IDS;
+ }
+ lock.lock();
+ try {
+ String ids = tx.get(Bytes.of(STATEMENT_PATTERN_ID), STATEMENT_PATTERN_IDS).toString();
+ IDS = Sets.newHashSet(ids.split(VAR_DELIM));
+ HASH = Optional.of(hash);
+ return IDS;
+ } finally {
+ lock.unlock();
+ }
+ }
+ return IDS;
+ }
+
+ /**
+ * Clears contexts of cache so that it will be re-populated next time
+ * {@link StatementPatternIdCache#getStatementPatternIds(TransactionBase)} is called.
+ */
+ public void clear() {
+ HASH = Optional.empty();
+ IDS.clear();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/StatementPatternIdCacheSupplier.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/StatementPatternIdCacheSupplier.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/StatementPatternIdCacheSupplier.java
new file mode 100644
index 0000000..01264dc
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/StatementPatternIdCacheSupplier.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.query;
+
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Manages the creation of the {@link StatementPatternIdCache} in the Fluo application.
+ * This supplier enforces singleton like behavior in that it will only create the cache if it
+ * doesn't already exist. The StatementPatternIdCache is not a singleton in itself.
+ */
+public class StatementPatternIdCacheSupplier {
+
+ private static final Logger LOG = LoggerFactory.getLogger(StatementPatternIdCacheSupplier.class);
+ private static boolean initialized = false;
+ private static StatementPatternIdCache CACHE;
+ private static final ReentrantLock lock = new ReentrantLock();
+
+ /**
+ * Returns an existing cache if one has been created, otherwise creates a new cache.
+ *
+ * @return - existing StatementPatternIdCache or new cache if one didn't already exist
+ */
+ public static StatementPatternIdCache getOrCreateCache() {
+ lock.lock();
+ try {
+ if (!initialized) {
+ LOG.debug("Cache has not been initialized. Initializing StatementPatternIdCache");
+ CACHE = new StatementPatternIdCache();
+ initialized = true;
+ } else {
+ LOG.debug("A StatementPatternIdCache has already been initialized.");
+ }
+ return CACHE;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Deletes stored cache and flags Supplier as uninitialized.
+ */
+ public static void clear() {
+ lock.lock();
+ try {
+ if (initialized) {
+ CACHE.clear();
+ CACHE = null;
+ initialized = false;
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/StatementPatternIdManager.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/StatementPatternIdManager.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/StatementPatternIdManager.java
new file mode 100644
index 0000000..ee4c053
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/StatementPatternIdManager.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.query;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.STATEMENT_PATTERN_ID;
+import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.VAR_DELIM;
+import static org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns.STATEMENT_PATTERN_IDS;
+import static org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns.STATEMENT_PATTERN_IDS_HASH;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.fluo.api.client.TransactionBase;
+import org.apache.fluo.api.data.Bytes;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Optional;
+import com.google.common.collect.Sets;
+import com.google.common.hash.Hashing;
+
+/**
+ * Utility class for updating and removing StatementPattern nodeIds in the Fluo table. All StatementPattern nodeIds are
+ * stored in a single set under a single entry in the Fluo table. This is to eliminate the need for a scan to find all
+ * StatementPattern nodeIds every time a new Triple enters the Fluo table. Instead, the nodeIds are cached locally, and
+ * only updated when the local hash of the nodeId set is dirty (not equal to the hash in the Fluo table).
+ */
+public class StatementPatternIdManager {
+
+ /**
+ * Add specified Set of ids to the Fluo table with Column {@link FluoQueryColumns#STATEMENT_PATTERN_IDS}. Also
+ * updates the hash of the updated nodeId Set and writes that to the Column
+ * {@link FluoQueryColumns#STATEMENT_PATTERN_IDS_HASH}
+ *
+ * @param tx - Fluo Transaction object for performing atomic operations on Fluo table.
+ * @param ids - ids to add to the StatementPattern nodeId Set
+ */
+ public static void addStatementPatternIds(TransactionBase tx, Set<String> ids) {
+ checkNotNull(tx);
+ checkNotNull(ids);
+ Optional<Bytes> val = Optional.fromNullable(tx.get(Bytes.of(STATEMENT_PATTERN_ID), STATEMENT_PATTERN_IDS));
+ StringBuilder builder = new StringBuilder();
+ if (val.isPresent()) {
+ builder.append(val.get().toString());
+ builder.append(VAR_DELIM);
+ }
+ String idString = builder.append(Joiner.on(VAR_DELIM).join(ids)).toString();
+ tx.set(Bytes.of(STATEMENT_PATTERN_ID), STATEMENT_PATTERN_IDS, Bytes.of(idString));
+ tx.set(Bytes.of(STATEMENT_PATTERN_ID), STATEMENT_PATTERN_IDS_HASH, Bytes.of(Hashing.sha256().hashString(idString).toString()));
+ }
+
+ /**
+ * Remove specified Set of ids from the Fluo table and updates the entry with Column
+ * {@link FluoQueryColumns#STATEMENT_PATTERN_IDS}. Also updates the hash of the updated nodeId Set and writes that
+ * to the Column {@link FluoQueryColumns#STATEMENT_PATTERN_IDS_HASH}
+ *
+ * @param tx - Fluo Transaction object for performing atomic operations on Fluo table.
+ * @param ids - ids to remove from the StatementPattern nodeId Set
+ */
+ public static void removeStatementPatternIds(TransactionBase tx, Set<String> ids) {
+ checkNotNull(tx);
+ checkNotNull(ids);
+ Optional<Bytes> val = Optional.fromNullable(tx.get(Bytes.of(STATEMENT_PATTERN_ID), STATEMENT_PATTERN_IDS));
+ Set<String> storedIds = new HashSet<>();
+ if (val.isPresent()) {
+ storedIds = Sets.newHashSet(val.get().toString().split(VAR_DELIM));
+ }
+ storedIds.removeAll(ids);
+ String idString = Joiner.on(VAR_DELIM).join(ids);
+ tx.set(Bytes.of(STATEMENT_PATTERN_ID), STATEMENT_PATTERN_IDS, Bytes.of(idString));
+ tx.set(Bytes.of(STATEMENT_PATTERN_ID), STATEMENT_PATTERN_IDS_HASH, Bytes.of(Hashing.sha256().hashString(idString).toString()));
+ }
+
+}
[4/4] incubator-rya git commit: RYA-406. Closes #251.
Posted by ca...@apache.org.
RYA-406. Closes #251.
Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/62de7c5d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/62de7c5d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/62de7c5d
Branch: refs/heads/master
Commit: 62de7c5d16ac6624228fd2c03f78b09845c1e74b
Parents: 6d2bfcb
Author: Caleb Meier <ca...@parsons.com>
Authored: Mon Nov 6 12:23:57 2017 -0800
Committer: Caleb Meier <ca...@parsons.com>
Committed: Tue Dec 5 11:47:45 2017 -0800
----------------------------------------------------------------------
.../src/main/java/RyaClientExample.java | 3 +-
.../notification/pruner/FluoBinPruner.java | 9 +-
.../rya.manual/src/site/markdown/pcj-updater.md | 186 +++++++++++++++++++
.../indexing/pcj/fluo/api/DeleteFluoPcj.java | 59 +++---
.../indexing/pcj/fluo/api/GetQueryReport.java | 21 ++-
.../indexing/pcj/fluo/api/InsertTriples.java | 26 ++-
extras/rya.pcj.fluo/pcj.fluo.app/pom.xml | 40 ++--
.../pcj/fluo/app/AbstractNodeUpdater.java | 45 +++++
.../pcj/fluo/app/AggregationResultUpdater.java | 5 +-
.../indexing/pcj/fluo/app/BindingSetRow.java | 41 +++-
.../fluo/app/ConstructQueryResultUpdater.java | 13 +-
.../pcj/fluo/app/FilterResultUpdater.java | 5 +-
.../rya/indexing/pcj/fluo/app/IncUpdateDAO.java | 11 +-
.../fluo/app/IncrementalUpdateConstants.java | 21 ++-
.../pcj/fluo/app/JoinResultUpdater.java | 62 +++----
.../pcj/fluo/app/PeriodicQueryUpdater.java | 21 +--
.../pcj/fluo/app/ProjectionResultUpdater.java | 10 +-
.../pcj/fluo/app/QueryResultUpdater.java | 5 +-
.../batch/AbstractBatchBindingSetUpdater.java | 6 +-
.../app/batch/AbstractSpanBatchInformation.java | 8 +-
.../app/batch/JoinBatchBindingSetUpdater.java | 16 +-
.../app/batch/SpanBatchBindingSetUpdater.java | 60 ++++--
.../app/batch/SpanBatchDeleteInformation.java | 78 +++++++-
.../SpanBatchInformationTypeAdapter.java | 17 +-
.../app/export/rya/RyaSubGraphExporter.java | 13 +-
.../fluo/app/observers/AggregationObserver.java | 9 +-
.../observers/ConstructQueryResultObserver.java | 12 +-
.../pcj/fluo/app/observers/FilterObserver.java | 3 +-
.../pcj/fluo/app/observers/JoinObserver.java | 3 +-
.../app/observers/PeriodicQueryObserver.java | 5 +-
.../fluo/app/observers/ProjectionObserver.java | 3 +-
.../fluo/app/observers/QueryResultObserver.java | 8 +-
.../app/observers/StatementPatternObserver.java | 5 +-
.../pcj/fluo/app/observers/TripleObserver.java | 49 ++---
.../pcj/fluo/app/query/FluoQueryColumns.java | 27 ++-
.../fluo/app/query/FluoQueryMetadataCache.java | 181 +++++++++++-------
.../fluo/app/query/FluoQueryMetadataDAO.java | 5 +
.../fluo/app/query/MetadataCacheSupplier.java | 70 +++++--
.../fluo/app/query/StatementPatternIdCache.java | 89 +++++++++
.../query/StatementPatternIdCacheSupplier.java | 74 ++++++++
.../app/query/StatementPatternIdManager.java | 90 +++++++++
.../app/util/BindingHashShardingFunction.java | 178 ++++++++++++++++++
.../pcj/fluo/app/util/TriplePrefixUtils.java | 62 +++++++
.../app/query/FluoQueryMetadataCacheTest.java | 18 ++
.../app/query/StatementPatternIdCacheTest.java | 55 ++++++
.../util/BindingHashShardingFunctionTest.java | 66 +++++++
.../fluo/app/util/TriplePrefixUtilsTest.java | 37 ++++
.../rya.pcj.fluo/pcj.fluo.integration/pom.xml | 29 +--
.../indexing/pcj/fluo/integration/BatchIT.java | 129 +++++++------
.../pcj/fluo/integration/CreateDeleteIT.java | 20 +-
.../integration/CreateDeletePeriodicPCJ.java | 7 +-
.../pcj/fluo/integration/FluoLatencyIT.java | 169 -----------------
.../indexing/pcj/fluo/integration/QueryIT.java | 32 ++--
.../integration/StatementPatternIdCacheIT.java | 82 ++++++++
.../src/test/resources/log4j.properties | 43 +++++
.../rya/pcj/fluo/test/base/FluoITBase.java | 41 ++--
.../pcj/fluo/test/base/KafkaExportITBase.java | 13 +-
.../pcj/functions/geo/GeoFunctionsIT.java | 14 +-
58 files changed, 1778 insertions(+), 631 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/indexingExample/src/main/java/RyaClientExample.java
----------------------------------------------------------------------
diff --git a/extras/indexingExample/src/main/java/RyaClientExample.java b/extras/indexingExample/src/main/java/RyaClientExample.java
index 3f42f1d..278f214 100644
--- a/extras/indexingExample/src/main/java/RyaClientExample.java
+++ b/extras/indexingExample/src/main/java/RyaClientExample.java
@@ -44,6 +44,7 @@ import org.apache.rya.api.client.RyaClient;
import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails;
import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory;
import org.apache.rya.indexing.accumulo.AccumuloIndexingConfiguration;
+import org.apache.rya.indexing.pcj.fluo.app.batch.BatchObserver;
import org.apache.rya.indexing.pcj.fluo.app.export.rya.RyaExportParameters;
import org.apache.rya.indexing.pcj.fluo.app.observers.FilterObserver;
import org.apache.rya.indexing.pcj.fluo.app.observers.JoinObserver;
@@ -75,7 +76,6 @@ public class RyaClientExample {
private static final Logger log = Logger.getLogger(RyaClientExample.class);
public static void main(final String[] args) throws Exception {
- setupLogging();
final String accumuloUsername = "root";
final String accumuloPassword = "password";
@@ -241,6 +241,7 @@ public class RyaClientExample {
// Setup the observers that will be used by the Fluo PCJ Application.
final List<ObserverSpecification> observers = new ArrayList<>();
observers.add(new ObserverSpecification(TripleObserver.class.getName()));
+ observers.add(new ObserverSpecification(BatchObserver.class.getName()));
observers.add(new ObserverSpecification(StatementPatternObserver.class.getName()));
observers.add(new ObserverSpecification(JoinObserver.class.getName()));
observers.add(new ObserverSpecification(FilterObserver.class.getName()));
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/pruner/FluoBinPruner.java
----------------------------------------------------------------------
diff --git a/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/pruner/FluoBinPruner.java b/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/pruner/FluoBinPruner.java
index 0562180..ea08af5 100644
--- a/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/pruner/FluoBinPruner.java
+++ b/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/pruner/FluoBinPruner.java
@@ -23,12 +23,14 @@ import org.apache.fluo.api.client.Transaction;
import org.apache.fluo.api.data.Bytes;
import org.apache.fluo.api.data.Column;
import org.apache.fluo.api.data.Span;
-import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants;
import org.apache.rya.indexing.pcj.fluo.app.NodeType;
import org.apache.rya.indexing.pcj.fluo.app.batch.BatchInformationDAO;
import org.apache.rya.indexing.pcj.fluo.app.batch.SpanBatchDeleteInformation;
+import org.apache.rya.indexing.pcj.fluo.app.util.BindingHashShardingFunction;
import org.apache.rya.periodic.notification.api.BinPruner;
import org.apache.rya.periodic.notification.api.NodeBin;
+import org.openrdf.model.ValueFactory;
+import org.openrdf.model.impl.ValueFactoryImpl;
import org.openrdf.query.BindingSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -41,6 +43,7 @@ import com.google.common.base.Optional;
public class FluoBinPruner implements BinPruner {
private static final Logger log = LoggerFactory.getLogger(FluoBinPruner.class);
+ private static final ValueFactory vf = new ValueFactoryImpl();
private final FluoClient client;
public FluoBinPruner(final FluoClient client) {
@@ -67,9 +70,9 @@ public class FluoBinPruner implements BinPruner {
throw new RuntimeException();
}
final Column batchInfoColumn = type.get().getResultColumn();
- final String batchInfoSpanPrefix = id + IncrementalUpdateConstants.NODEID_BS_DELIM + bin;
+ final Bytes batchInfoSpanPrefix = BindingHashShardingFunction.getShardedScanPrefix(id, vf.createLiteral(bin));
final SpanBatchDeleteInformation batchInfo = SpanBatchDeleteInformation.builder().setColumn(batchInfoColumn)
- .setSpan(Span.prefix(Bytes.of(batchInfoSpanPrefix))).build();
+ .setSpan(Span.prefix(batchInfoSpanPrefix)).build();
BatchInformationDAO.addBatch(tx, id, batchInfo);
tx.commit();
}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.manual/src/site/markdown/pcj-updater.md
----------------------------------------------------------------------
diff --git a/extras/rya.manual/src/site/markdown/pcj-updater.md b/extras/rya.manual/src/site/markdown/pcj-updater.md
index 11cb560..157bb13 100644
--- a/extras/rya.manual/src/site/markdown/pcj-updater.md
+++ b/extras/rya.manual/src/site/markdown/pcj-updater.md
@@ -508,6 +508,192 @@ will fail. If that occurs, look up the YARN Application-Id in the YARN UI,
or with the command `yarn application -list` and then kill it with a command
similar to: `yarn application -kill application_1503402439867_0009`.
+## Performance Optimizations
+There are a number of optimizations that can boost the performance of the Rya PCJ Updater. The main
+bottleneck that will prevent an instance of the PCJ Updater from scaling is the load that the application
+places on the Accumulo Tablet Servers. In an effort to mitigate this, there are a number of things that can be done to
+lighten the scan load on each Tablet Server and cut down on the number of scans overall.
+
+#### Metadata Caching
+The PCJ Updater uses metadata associated with each query node to route and process intermediate query results.
+New queries can be dynamically added to and deleted from the Rya PCJ Updater, but for the most part this data
+is static and can be cached. So the PCJ Updater aggressively caches whatever metadata it can to avoid lookups. In addition,
+each time the Updater processes new statements, it must match the new triples with StatementPatterns registered with the
+PCJ Updater. The ids of these StatementPatterns are also cached to avoid costly scans for new StatementPatterns. All
+metadata caches are active and utilized by default.
+
+#### Load Balancing and Sharding
+Another important optimization that can drastically boost performance is sharding. Sharding ensures that
+the processing and scanning load is equally distributed among all Tablet Servers. By default, the PCJ Updater
+shards its rows. However, to take advantage of this optimization, the user must pre-split the Fluo table after
+initializing the application. To do this, add the following properties to the fluo.properties file for the application
+before initializing:
+
+```
+# RowHasher Split Properties
+# -------------------
+fluo.app.recipes.optimizations.SP=org.apache.fluo.recipes.core.data.RowHasher$Optimizer
+fluo.app.recipes.optimizations.J=org.apache.fluo.recipes.core.data.RowHasher$Optimizer
+fluo.app.recipes.optimizations.A=org.apache.fluo.recipes.core.data.RowHasher$Optimizer
+fluo.app.recipes.optimizations.PR=org.apache.fluo.recipes.core.data.RowHasher$Optimizer
+fluo.app.recipes.optimizations.Q=org.apache.fluo.recipes.core.data.RowHasher$Optimizer
+
+fluo.app.recipes.rowHasher.SP.numTablets=8
+fluo.app.recipes.rowHasher.J.numTablets=8
+fluo.app.recipes.rowHasher.A.numTablets=8
+fluo.app.recipes.rowHasher.PR.numTablets=8
+fluo.app.recipes.rowHasher.Q.numTablets=8
+
+```
+
+The above properties are used by the Fluo RowHasher recipe, which splits the Fluo
+table along each specified prefix. The properties above indicate that the Fluo table
+will be split across 8 tablets along each of the prefixes SP, J, A, PR, and Q. Each of these
+prefixes correspond to prefixes of distinct result ranges stored in the Rya PCJ Updater.
+The choice of 8 tablets is arbitrary and should be based on available resources. After
+adding the above properties and initializing the application, execute the following command:
+
+```
+./bin/fluo exec <app_name> org.apache.fluo.recipes.accumulo.cmds.OptimizeTable
+
+```
+
+This command generates and applies the splits indicated by above properties. See
+the Fluo [row hash prefix recipe](https://fluo.apache.org/docs/fluo-recipes/1.1.0-incubating/row-hasher/)
+for more details.
+
+#### Compaction Strategies for Garbage Collecting
+The PCJ Updater application retains processed notifications and triples that
+are marked for deletion until a minor or major compaction runs and triggers the
+Fluo Garbage Collection iterator. Because new Triples are processed by the TripleObserver
+and then immediately marked as deleted, it's quite possible that a large number of "deleted" triples
+could build up before they are actually removed from the table. Similarly,
+old notifications that have already been processed and marked as deleted can pile up as well. As these entries
+build up in the Fluo table, Tablet Servers have to scan over these entries when the
+Fluo NotificationIterator is run, creating extra work for the Tablet Servers. To count the number
+of "DELETED" notifications and triples that are in the table, run the following commands:
+
+```
+ #counts number of old notifications
+ ./bin/fluo scan <app_name> --raw -c ntfy | grep -c 'DELETE'
+ #counts number of deleted triples
+ ./bin/fluo scan <app_name> --raw -c triples | grep -c 'DELETE'
+```
+
+It's good practice to monitor how these quantities grow after starting the application to get
+a sense of whether or not Accumulo compactions are being executed frequently enough. One possible
+optimization to increase the compaction rate is to adjust the compaction ratio through the Accumulo shell.
+After initializing the PCJ Updater application, execute the following command in the Accumulo shell:
+
+```
+config -t <app_table_name> -s table.compaction.major.ratio=1.0
+```
+
+This sets the major compaction ratio of the Fluo table to 1.0, where the lower the ratio, the more frequently
+major compactions will occur. Another approach is to compact on a specified Range. Fluo supports
+periodically compacting on specified ranges. To do this, add any of the following hex ranges to the
+fluo.properties file before initializing the PCJ Updater.
+
+```
+#Rya PCJ Updater Compaction ranges
+#Compact all triples
+fluo.app.recipes.transientRange.triples=543C3C3A3E3E:543C3C3A3E3EFF
+#Compact all statement pattern results
+fluo.app.recipes.transientRange.statementPattern=53503C3C3A3E3E:53503C3C3A3E3EFF
+#Compact all join results
+fluo.app.recipes.transientRange.join=4A3C3C3A3E3E:4A3C3C3A3E3EFF
+#Compact all aggregation results
+fluo.app.recipes.transientRange.aggregation=413C3C3A3E3E:413C3C3A3E3EFF
+#Compact all projection results
+fluo.app.recipes.transientRange.projection=50523C3C3A3E3E:50523C3C3A3E3EFF
+#Compact full table
+fluo.app.recipes.transientRange.fullTable=:FF
+```
+To apply the above transient range properties, execute the following Fluo command:
+
+```
+./bin/fluo exec <app_name> org.apache.fluo.recipes.accumulo.cmds.CompactTransient <compaction_period> <multiplier>
+```
+
+The above command executes a compaction over each transient range that is included in the fluo.properties file.
+These compactions will run indefinitely and execute every time the compaction period (in ms) elapses. The multiplier
+is an optional parameter that indicates how much the periodic compaction script will throttle compactions if they
+begin taking too long. See the Fluo [transient data recipe](https://fluo.apache.org/docs/fluo-recipes/1.1.0-incubating/transient/)
+for more information.
+
+If running compactions proves to be extremely costly and begins to affect application performance, it's best to
+only do a range compaction on the triples. This will clean up old, deleted triples and any notifications related to triples.
+If the Tablet Servers can handle the additional load, try adding additional ranges to clean up
+old data. Note that when the full table range is added, there is no need to include any of the other
+ranges.
+
+#### Cheatsheet for Applying the Optimizations
+To apply the above optimizations,
+
+1. Stop the Rya PCJ Updater app if it is running by
+executing the following command
+```
+./bin/fluo stop <app_name>
+```
+ 2. Once the application is stopped, add any optimization related properties to the fluo.properties file.
+ These properties include the row sharding properties and the range compaction properties outlined
+ above. Note that it is best to add all of the row shard properties to ensure an even data distribution
+ among all of the Tablet Servers, and start off by adding only the triple transient range property for Range compactions.
+
+ 3. Initialize the Rya PCJ Updater application by executing the following command in Fluo
+ ```
+ ./bin/fluo init <app_name>
+ ```
+ 4. Apply the row shard splits by executing
+ ```
+ ./bin/fluo exec <app_name> org.apache.fluo.recipes.accumulo.cmds.OptimizeTable
+```
+5. Check the PCJ Updater table in the Accumulo UI to ensure that the table has the
+correct number of Tablet Servers
+
+6. Start the application by executing the following command
+```
+./bin/fluo start <app_name>
+```
+7. Apply the transient range compactions by executing the following command
+```
+./bin/fluo exec <app_name> org.apache.fluo.recipes.accumulo.cmds.CompactTransient <compaction_period> <multiplier>
+```
+
+#### Monitoring Performance
+Once the application is running, there are a number ways to assess its performance. The
+primary method is to track the number of outstanding notifications that are queued and waiting
+to be processed. This can be done by executing the Fluo wait command
+
+```
+./bin/fluo wait <app_name>
+```
+
+When this script executes, it polls Fluo every ten seconds to determine how many unprocessed notifications
+are queued up. If this number grows over time then the application needs more workers to handle the ingest load.
+
+Before deploying more workers, first verify that the Tablet Servers can handle an increased scan load by checking
+the Accumulo UI to see if the Tablet Servers have any queued scans for the PCJ Updater table. If there are a large
+number of queued scans for the table, adding more workers might not be possible. It may be necessary to lower the
+ingest rate. If the Tablet Servers are keeping up, deploy additional workers by stopping the application, updating
+the fluo.properties to use more workers, and re-initializing and starting the application.
+
+Other ways to assess the performance of the application is to monitor the scan rate through the Accumulo UI. If the scan
+rate is staying approximately constant over time (or increasing very slowly), then the application is performing as expected.
+In general, the scan rate increases because
+
+1. The number of intermediate results increases over time
+2. The number of "deleted" notifications that need to flushed increases over time
+3. The number of "deleted" triples that need to be flushed increases over time
+
+The first item is unavoidable if there is no age off policy for the PCJ Updater application (which is currently the case).
+However, 2 and 3 are avoidable by applying the range compaction optimizations discussed above. So it's important to monitor the
+scan rate (and the number of old triples and notifications as discussed above) to assess the health of the application.
+
+Finally an additional item to monitor is which iterators are running in the Fluo table. This can be done by regularly running the listscans
+command in the Accumulo shell. This gives a sense of how the Tablet Servers are being used. It helps determine whether they are
+spending most of their time finding new notifications (internal Fluo work), or issuing scans that are specific to Rya PCJ Updater Observers.
+
[Apache Fluo]: https://fluo.apache.org/
[Apache Fluo 1.0.0-incubating Documentation]: https://fluo.apache.org/docs/fluo/1.0.0-incubating/
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/DeleteFluoPcj.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/DeleteFluoPcj.java b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/DeleteFluoPcj.java
index 0d97b2f..e0123be 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/DeleteFluoPcj.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/DeleteFluoPcj.java
@@ -21,19 +21,22 @@ package org.apache.rya.indexing.pcj.fluo.api;
import static com.google.common.base.Preconditions.checkArgument;
import static java.util.Objects.requireNonNull;
-import java.util.Iterator;
+import java.util.HashSet;
import java.util.List;
+import java.util.Optional;
+import java.util.Set;
import org.apache.fluo.api.client.FluoClient;
import org.apache.fluo.api.client.Transaction;
-import org.apache.fluo.api.client.scanner.CellScanner;
import org.apache.fluo.api.data.Bytes;
import org.apache.fluo.api.data.Column;
-import org.apache.fluo.api.data.RowColumnValue;
import org.apache.fluo.api.data.Span;
import org.apache.rya.indexing.pcj.fluo.app.NodeType;
+import org.apache.rya.indexing.pcj.fluo.app.batch.BatchInformationDAO;
+import org.apache.rya.indexing.pcj.fluo.app.batch.SpanBatchDeleteInformation;
import org.apache.rya.indexing.pcj.fluo.app.query.FluoQuery;
import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO;
+import org.apache.rya.indexing.pcj.fluo.app.query.StatementPatternIdManager;
import org.apache.rya.indexing.pcj.fluo.app.query.UnsupportedQueryException;
import org.apache.rya.indexing.pcj.fluo.app.util.FluoQueryUtils;
import org.openrdf.query.BindingSet;
@@ -79,7 +82,8 @@ public class DeleteFluoPcj {
* Index. (not null)
* @param pcjId - The PCJ ID for the query that will removed from the Fluo
* application. (not null)
- * @throws UnsupportedQueryException
+ * @throws UnsupportedQueryException - thrown when Fluo app is unable to read FluoQuery associated
+ * with given pcjId.
*/
public void deletePcj(final FluoClient client, final String pcjId) throws UnsupportedQueryException {
requireNonNull(client);
@@ -104,7 +108,8 @@ public class DeleteFluoPcj {
* @param tx - Transaction of a given Fluo table. (not null)
* @param pcjId - Id of query. (not null)
* @return list of Node IDs associated with the query {@code pcjId}.
- * @throws UnsupportedQueryException
+ * @throws UnsupportedQueryException - thrown when Fluo app is unable to read FluoQuery associated
+ * with given pcjId.
*/
private List<String> getNodeIds(Transaction tx, String pcjId) throws UnsupportedQueryException {
requireNonNull(tx);
@@ -131,10 +136,18 @@ public class DeleteFluoPcj {
requireNonNull(pcjId);
try (final Transaction typeTx = tx) {
+ Set<String> spNodeIds = new HashSet<>();
+ //remove metadata associated with each nodeId and store statement pattern nodeIds
for (final String nodeId : nodeIds) {
final NodeType type = NodeType.fromNodeId(nodeId).get();
+ if(type == NodeType.STATEMENT_PATTERN) {
+ spNodeIds.add(nodeId);
+ }
deleteMetadataColumns(typeTx, nodeId, type.getMetaDataColumns());
}
+ //Use stored statement pattern nodeIds to update list of stored statement pattern nodeIds
+ //in Fluo table
+ StatementPatternIdManager.removeStatementPatternIds(typeTx, spNodeIds);
typeTx.commit();
}
}
@@ -169,37 +182,11 @@ public class DeleteFluoPcj {
final NodeType type = NodeType.fromNodeId(nodeId).get();
Transaction tx = client.newTransaction();
- while (deleteDataBatch(tx, getIterator(tx, nodeId, type.getResultColumn()), type.getResultColumn())) {
- tx = client.newTransaction();
- }
- }
-
- private CellScanner getIterator(final Transaction tx, final String nodeId, final Column column) {
- requireNonNull(tx);
- requireNonNull(nodeId);
- requireNonNull(column);
-
- return tx.scanner().fetch(column).over(Span.prefix(nodeId)).build();
- }
-
- private boolean deleteDataBatch(final Transaction tx, final CellScanner scanner, final Column column) {
- requireNonNull(tx);
- requireNonNull(scanner);
- requireNonNull(column);
-
- try (Transaction ntx = tx) {
- int count = 0;
- final Iterator<RowColumnValue> iter = scanner.iterator();
- while (iter.hasNext() && count < batchSize) {
- final Bytes row = iter.next().getRow();
- count++;
- tx.delete(row, column);
- }
-
- final boolean hasNext = iter.hasNext();
- tx.commit();
- return hasNext;
- }
+ Bytes prefixBytes = Bytes.of(type.getNodeTypePrefix());
+ SpanBatchDeleteInformation batch = SpanBatchDeleteInformation.builder().setColumn(type.getResultColumn())
+ .setSpan(Span.prefix(prefixBytes)).setBatchSize(batchSize).setNodeId(Optional.of(nodeId)).build();
+ BatchInformationDAO.addBatch(tx, nodeId, batch);
+ tx.commit();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/GetQueryReport.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/GetQueryReport.java b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/GetQueryReport.java
index ddbaaaf..4de4069 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/GetQueryReport.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/GetQueryReport.java
@@ -30,8 +30,11 @@ import org.apache.fluo.api.client.Snapshot;
import org.apache.fluo.api.client.SnapshotBase;
import org.apache.fluo.api.client.scanner.ColumnScanner;
import org.apache.fluo.api.client.scanner.RowScanner;
+import org.apache.fluo.api.data.Bytes;
import org.apache.fluo.api.data.Column;
import org.apache.fluo.api.data.Span;
+import org.apache.rya.indexing.pcj.fluo.app.BindingSetRow;
+import org.apache.rya.indexing.pcj.fluo.app.NodeType;
import org.apache.rya.indexing.pcj.fluo.app.query.FilterMetadata;
import org.apache.rya.indexing.pcj.fluo.app.query.FluoQuery;
import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
@@ -63,7 +66,7 @@ public class GetQueryReport {
* @param fluo - The connection to Fluo that will be used to fetch the metadata. (not null)
* @return A map from Query ID to QueryReport that holds a report for all of
* the queries that are being managed within the fluo app.
- * @throws UnsupportedQueryException
+ * @throws UnsupportedQueryException
*/
public Map<String, QueryReport> getAllQueryReports(final FluoClient fluo) throws UnsupportedQueryException {
checkNotNull(fluo);
@@ -86,7 +89,7 @@ public class GetQueryReport {
* @param fluo - The connection to Fluo that will be used to fetch the metadata. (not null)
* @param queryId - The ID of the query to fetch. (not null)
* @return A report that was built for the query.
- * @throws UnsupportedQueryException
+ * @throws UnsupportedQueryException
*/
public QueryReport getReport(final FluoClient fluo, final String queryId) throws UnsupportedQueryException {
checkNotNull(fluo);
@@ -132,14 +135,20 @@ public class GetQueryReport {
checkNotNull(nodeId);
checkNotNull(bindingSetColumn);
+ NodeType type = NodeType.fromNodeId(nodeId).get();
+ Bytes prefixBytes = Bytes.of(type.getNodeTypePrefix());
+
// Limit the scan to the binding set column and node id.
- final RowScanner rows = sx.scanner().over(Span.prefix(nodeId)).fetch(bindingSetColumn).byRow().build();
+ final RowScanner rows = sx.scanner().over(Span.prefix(prefixBytes)).fetch(bindingSetColumn).byRow().build();
BigInteger count = BigInteger.valueOf(0L);
for (ColumnScanner columns : rows) {
- count = count.add( BigInteger.ONE );
- }
-
+ String row = BindingSetRow.makeFromShardedRow(prefixBytes, columns.getRow()).getNodeId();
+ if (row.equals(nodeId)) {
+ count = count.add(BigInteger.ONE);
+ }
+ }
+
return count;
}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/InsertTriples.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/InsertTriples.java b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/InsertTriples.java
index 1e86836..b784fbf 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/InsertTriples.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/InsertTriples.java
@@ -19,24 +19,24 @@
package org.apache.rya.indexing.pcj.fluo.api;
import static com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.rya.indexing.pcj.fluo.app.util.TriplePrefixUtils.addTriplePrefixAndConvertToBytes;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
-import org.apache.log4j.Logger;
-import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
-
-import com.google.common.base.Optional;
-
import org.apache.fluo.api.client.FluoClient;
import org.apache.fluo.api.client.Transaction;
import org.apache.fluo.api.data.Bytes;
+import org.apache.log4j.Logger;
import org.apache.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT;
import org.apache.rya.api.domain.RyaStatement;
import org.apache.rya.api.resolver.triple.TripleRow;
import org.apache.rya.api.resolver.triple.TripleRowResolverException;
import org.apache.rya.api.resolver.triple.impl.WholeRowTripleResolver;
+import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
+
+import com.google.common.base.Optional;
/**
* Insert a batch of Triples into. This will trigger observers that will update
@@ -79,7 +79,7 @@ public class InsertTriples {
try(Transaction tx = fluo.newTransaction()) {
for(final RyaStatement triple : triples) {
try {
- tx.set(Bytes.of(spoFormat(triple)), FluoQueryColumns.TRIPLES, Bytes.of(visibility.or("")));
+ tx.set(spoFormat(triple), FluoQueryColumns.TRIPLES, Bytes.of(visibility.or("")));
} catch (final TripleRowResolverException e) {
log.error("Could not convert a Triple into the SPO format: " + triple);
}
@@ -88,7 +88,7 @@ public class InsertTriples {
tx.commit();
}
}
-
+
/**
* Inserts a triple into Fluo.
*
@@ -101,9 +101,9 @@ public class InsertTriples {
insert(fluo, Collections.singleton(triple));
}
-
+
/**
- * Insert a batch of RyaStatements into Fluo.
+ * Insert a batch of RyaStatements into Fluo.
*
* @param fluo - A connection to the Fluo table that will be updated. (not null)
* @param triples - The triples to insert. (not null)
@@ -116,7 +116,7 @@ public class InsertTriples {
for(final RyaStatement triple : triples) {
Optional<byte[]> visibility = Optional.fromNullable(triple.getColumnVisibility());
try {
- tx.set(Bytes.of(spoFormat(triple)), FluoQueryColumns.TRIPLES, Bytes.of(visibility.or(new byte[0])));
+ tx.set(spoFormat(triple), FluoQueryColumns.TRIPLES, Bytes.of(visibility.or(new byte[0])));
} catch (final TripleRowResolverException e) {
log.error("Could not convert a Triple into the SPO format: " + triple);
}
@@ -125,8 +125,6 @@ public class InsertTriples {
tx.commit();
}
}
-
-
/**
* Converts a triple into a byte[] holding the Rya SPO representation of it.
@@ -135,10 +133,10 @@ public class InsertTriples {
* @return The Rya SPO representation of the triple.
* @throws TripleRowResolverException The triple could not be converted.
*/
- public static byte[] spoFormat(final RyaStatement triple) throws TripleRowResolverException {
+ public static Bytes spoFormat(final RyaStatement triple) throws TripleRowResolverException {
checkNotNull(triple);
final Map<TABLE_LAYOUT, TripleRow> serialized = TRIPLE_RESOLVER.serialize(triple);
final TripleRow spoRow = serialized.get(TABLE_LAYOUT.SPO);
- return spoRow.getRow();
+ return addTriplePrefixAndConvertToBytes(spoRow.getRow());
}
}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.app/pom.xml
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/pom.xml b/extras/rya.pcj.fluo/pcj.fluo.app/pom.xml
index fd624eb..4839c04 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/pom.xml
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/pom.xml
@@ -1,16 +1,22 @@
-<?xml version="1.0" encoding="utf-8"?>
-<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor
- license agreements. See the NOTICE file distributed with this work for additional
- information regarding copyright ownership. The ASF licenses this file to
- you under the Apache License, Version 2.0 (the "License"); you may not use
- this file except in compliance with the License. You may obtain a copy of
- the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
- by applicable law or agreed to in writing, software distributed under the
- License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
- OF ANY KIND, either express or implied. See the License for the specific
- language governing permissions and limitations under the License. -->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>org.apache.rya</groupId>
@@ -60,6 +66,14 @@
<artifactId>fluo-api</artifactId>
</dependency>
<dependency>
+ <groupId>org.apache.fluo</groupId>
+ <artifactId>fluo-recipes-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.fluo</groupId>
+ <artifactId>fluo-recipes-accumulo</artifactId>
+ </dependency>
+ <dependency>
<groupId>org.openrdf.sesame</groupId>
<artifactId>sesame-queryrender</artifactId>
</dependency>
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/AbstractNodeUpdater.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/AbstractNodeUpdater.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/AbstractNodeUpdater.java
new file mode 100644
index 0000000..62fc48c
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/AbstractNodeUpdater.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app;
+
+import org.apache.fluo.api.data.Bytes;
+import org.apache.rya.indexing.pcj.fluo.app.util.BindingHashShardingFunction;
+import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
+import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
+
+/**
+ * This class provides the common functionality for generating sharded row
+ * keys for any class that extends it.
+ *
+ */
+public abstract class AbstractNodeUpdater {
+
+ /**
+ * Generates a sharded row key from the provided arguments for inserting new results
+ * into the Fluo table. The row key is of the form node prefix + shardId + nodeId + BindingSet values.
+ * The shardId is formed from a hash of the first Binding value as indicated by the VariableOrder.
+ * @param nodeId - query nodeId corresponding to new result
+ * @param varOrder - varOrder used to order Binding values for new row key
+ * @param bs - BindingSet that values will be taken from to form key
+ * @return - sharded row key formed from the provided arguments
+ */
+ public Bytes makeRowKey(String nodeId, VariableOrder varOrder, VisibilityBindingSet bs) {
+ return BindingHashShardingFunction.addShard(nodeId, varOrder, bs);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/AggregationResultUpdater.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/AggregationResultUpdater.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/AggregationResultUpdater.java
index 2e41cb1..3acf3e9 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/AggregationResultUpdater.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/AggregationResultUpdater.java
@@ -42,7 +42,6 @@ import org.apache.rya.indexing.pcj.fluo.app.query.AggregationMetadata;
import org.apache.rya.indexing.pcj.fluo.app.query.AggregationMetadata.AggregationElement;
import org.apache.rya.indexing.pcj.fluo.app.query.AggregationMetadata.AggregationType;
import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
-import org.apache.rya.indexing.pcj.fluo.app.util.RowKeyUtil;
import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
import org.openrdf.model.Literal;
@@ -65,7 +64,7 @@ import edu.umd.cs.findbugs.annotations.NonNull;
* Updates the results of an Aggregate node when its child has added a new Binding Set to its results.
*/
@DefaultAnnotation(NonNull.class)
-public class AggregationResultUpdater {
+public class AggregationResultUpdater extends AbstractNodeUpdater {
private static final Logger log = Logger.getLogger(AggregationResultUpdater.class);
private static final AggregationStateSerDe AGG_STATE_SERDE = new ObjectSerializationAggregationStateSerDe();
@@ -104,7 +103,7 @@ public class AggregationResultUpdater {
// The Row ID for the Aggregation State that needs to be updated is defined by the Group By variables.
final String aggregationNodeId = aggregationMetadata.getNodeId();
final VariableOrder groupByVars = aggregationMetadata.getGroupByVariableOrder();
- final Bytes rowId = RowKeyUtil.makeRowKey(aggregationNodeId, groupByVars, childBindingSet);
+ final Bytes rowId = makeRowKey(aggregationNodeId, groupByVars, childBindingSet);
// Load the old state from the bytes if one was found; otherwise initialize the state.
final Optional<Bytes> stateBytes = Optional.ofNullable( tx.get(rowId, FluoQueryColumns.AGGREGATION_BINDING_SET) );
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/BindingSetRow.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/BindingSetRow.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/BindingSetRow.java
index 2e45ea6..9b7558f 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/BindingSetRow.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/BindingSetRow.java
@@ -21,15 +21,17 @@ package org.apache.rya.indexing.pcj.fluo.app;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.NODEID_BS_DELIM;
+import java.util.Objects;
+
import org.apache.fluo.api.data.Bytes;
+import org.apache.rya.indexing.pcj.fluo.app.util.BindingHashShardingFunction;
import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
import edu.umd.cs.findbugs.annotations.NonNull;
import net.jcip.annotations.Immutable;
/**
- * The values of an Accumulo Row ID for a row that stores a Binding set for
- * a specific Node ID of a query.
+ * The values of an Accumulo Row ID for a row that stores a Binding set for a specific Node ID of a query.
*/
@Immutable
@DefaultAnnotation(NonNull.class)
@@ -77,4 +79,39 @@ public class BindingSetRow {
final String bindingSetString = rowArray.length == 2 ? rowArray[1] : "";
return new BindingSetRow(nodeId, bindingSetString);
}
+
+ /**
+ * Creates a BindingSetRow from a sharded row entry, where the row is sharded according to
+ * {@link BindingHashShardingFunction}.
+ *
+ * @param prefixBytes - prefix of the node type that the row corresponds to (see prefixes in
+ * {@link IncrementalUpdateConstants}).
+ * @param row - row that BindingSetRow is created from
+ * @return - BindingSetRow object
+ */
+ public static BindingSetRow makeFromShardedRow(Bytes prefixBytes, Bytes row) {
+ return make(BindingHashShardingFunction.removeHash(prefixBytes, row));
+ }
+
+ @Override
+ public String toString() {
+ return "NodeId: " + nodeId + "\n" + "BindingSet String: " + bindingSetString;
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if(this == other) { return true;}
+
+ if (other instanceof BindingSetRow) {
+ BindingSetRow row = (BindingSetRow) other;
+ return Objects.equals(nodeId, row.nodeId) && Objects.equals(bindingSetString, row.bindingSetString);
+ }
+
+ return false;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(nodeId, bindingSetString);
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/ConstructQueryResultUpdater.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/ConstructQueryResultUpdater.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/ConstructQueryResultUpdater.java
index 6642780..b50e862 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/ConstructQueryResultUpdater.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/ConstructQueryResultUpdater.java
@@ -28,22 +28,21 @@ import org.apache.rya.api.domain.RyaSubGraph;
import org.apache.rya.indexing.pcj.fluo.app.export.kafka.RyaSubGraphKafkaSerDe;
import org.apache.rya.indexing.pcj.fluo.app.query.ConstructQueryMetadata;
import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
-import org.apache.rya.indexing.pcj.fluo.app.util.RowKeyUtil;
import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
/**
* This class creates results for the ConstructQuery. This class applies the {@link ConstructGraph}
* associated with the Construct Query to generate a collection of {@link RyaStatement}s. These statements
- * are then used to form a {@link RyaSubGraph} that is serialized and stored as a value in the Column
+ * are then used to form a {@link RyaSubGraph} that is serialized and stored as a value in the Column
* {@link FluoQueryColumns#CONSTRUCT_STATEMENTS}.
*
*/
-public class ConstructQueryResultUpdater {
+public class ConstructQueryResultUpdater extends AbstractNodeUpdater {
private static final Logger log = Logger.getLogger(ConstructQueryResultUpdater.class);
private static final RyaSubGraphKafkaSerDe serializer = new RyaSubGraphKafkaSerDe();
-
+
/**
* Updates the Construct Query results by applying the {@link ConnstructGraph} to
* create a {@link RyaSubGraph} and then writing the subgraph to {@link FluoQueryColumns#CONSTRUCT_STATEMENTS}.
@@ -52,15 +51,15 @@ public class ConstructQueryResultUpdater {
* @param metadata - metadata that the ConstructProjection is extracted from
*/
public void updateConstructQueryResults(TransactionBase tx, VisibilityBindingSet bs, ConstructQueryMetadata metadata) {
-
+
String nodeId = metadata.getNodeId();
VariableOrder varOrder = metadata.getVariableOrder();
Column column = FluoQueryColumns.CONSTRUCT_STATEMENTS;
ConstructGraph graph = metadata.getConstructGraph();
String parentId = metadata.getParentNodeId();
-
+
// Create the Row Key for the emitted binding set. It does not contain visibilities.
- final Bytes resultRow = RowKeyUtil.makeRowKey(nodeId, varOrder, bs);
+ final Bytes resultRow = makeRowKey(nodeId, varOrder, bs);
// If this is a new binding set, then emit it.
if(tx.get(resultRow, column) == null || varOrder.getVariableOrders().size() < bs.size()) {
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/FilterResultUpdater.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/FilterResultUpdater.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/FilterResultUpdater.java
index 2cc9f77..c829156 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/FilterResultUpdater.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/FilterResultUpdater.java
@@ -25,7 +25,6 @@ import org.apache.fluo.api.data.Bytes;
import org.apache.rya.indexing.pcj.fluo.app.query.FilterMetadata;
import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
import org.apache.rya.indexing.pcj.fluo.app.util.FilterSerializer;
-import org.apache.rya.indexing.pcj.fluo.app.util.RowKeyUtil;
import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSetSerDe;
@@ -57,7 +56,7 @@ import info.aduna.iteration.CloseableIteration;
* Set to its results.
*/
@DefaultAnnotation(NonNull.class)
-public class FilterResultUpdater {
+public class FilterResultUpdater extends AbstractNodeUpdater {
private static final Logger log = LoggerFactory.getLogger(FilterResultUpdater.class);
@@ -114,7 +113,7 @@ public class FilterResultUpdater {
// Create the Row Key for the emitted binding set. It does not contain visibilities.
final VariableOrder filterVarOrder = filterMetadata.getVariableOrder();
- final Bytes resultRow = RowKeyUtil.makeRowKey(filterMetadata.getNodeId(), filterVarOrder, childBindingSet);
+ final Bytes resultRow = makeRowKey(filterMetadata.getNodeId(), filterVarOrder, childBindingSet);
// Serialize and emit BindingSet
final Bytes nodeValueBytes = BS_SERDE.serialize(childBindingSet);
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/IncUpdateDAO.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/IncUpdateDAO.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/IncUpdateDAO.java
index 602fd9d..e0483ab 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/IncUpdateDAO.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/IncUpdateDAO.java
@@ -22,6 +22,7 @@ import static com.google.common.base.Preconditions.checkNotNull;
import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.DELIM;
import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.TYPE_DELIM;
import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.URI_TYPE;
+import static org.apache.rya.indexing.pcj.fluo.app.util.TriplePrefixUtils.removeTriplePrefixAndConvertToByteArray;
import org.apache.fluo.api.client.FluoClient;
import org.apache.fluo.api.client.Snapshot;
@@ -41,8 +42,15 @@ public class IncUpdateDAO {
private static final WholeRowTripleResolver tr = new WholeRowTripleResolver();
+ /**
+ * Deserializes a triple stored in the Fluo table.
+ * @param row - serialized triple
+ * @return - triple deserialized as a RyaStatement
+ */
public static RyaStatement deserializeTriple(final Bytes row) {
- final byte[] rowArray = row.toArray();
+
+ checkNotNull(row);
+ final byte[] rowArray = removeTriplePrefixAndConvertToByteArray(row);
RyaStatement rs = null;
try {
@@ -55,6 +63,7 @@ public class IncUpdateDAO {
}
public static String getTripleString(final RyaStatement rs) {
+ checkNotNull(rs);
final String subj = rs.getSubject().getData() + TYPE_DELIM + URI_TYPE;
final String pred = rs.getPredicate().getData() + TYPE_DELIM + URI_TYPE;
final String objData = rs.getObject().getData();
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/IncrementalUpdateConstants.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/IncrementalUpdateConstants.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/IncrementalUpdateConstants.java
index 5405837..63b9715 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/IncrementalUpdateConstants.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/IncrementalUpdateConstants.java
@@ -30,15 +30,18 @@ public class IncrementalUpdateConstants {
public static final String TYPE_DELIM = "<<~>>";
//to be used in construction of id for each node
- public static final String SP_PREFIX = "STATEMENT_PATTERN";
- public static final String JOIN_PREFIX = "JOIN";
- public static final String FILTER_PREFIX = "FILTER";
- public static final String AGGREGATION_PREFIX = "AGGREGATION";
- public static final String QUERY_PREFIX = "QUERY";
- public static final String PROJECTION_PREFIX = "PROJECTION";
- public static final String CONSTRUCT_PREFIX = "CONSTRUCT";
- public static final String PERIODIC_QUERY_PREFIX = "PERIODIC_QUERY";
-
+ public static final String TRIPLE_PREFIX = "T";
+ public static final String SP_PREFIX = "SP";
+ public static final String JOIN_PREFIX = "J";
+ public static final String FILTER_PREFIX = "F";
+ public static final String AGGREGATION_PREFIX = "A";
+ public static final String QUERY_PREFIX = "Q";
+ public static final String PROJECTION_PREFIX = "PR";
+ public static final String CONSTRUCT_PREFIX = "C";
+ public static final String PERIODIC_QUERY_PREFIX = "PE";
+
+ public static final String STATEMENT_PATTERN_ID = "SP_ID";
+
//binding name reserved for periodic bin id for periodic query results
public static final String PERIODIC_BIN_ID = PeriodicQueryResultStorage.PeriodicBinId;
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/JoinResultUpdater.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/JoinResultUpdater.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/JoinResultUpdater.java
index fb3ee0c..bcce77d 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/JoinResultUpdater.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/JoinResultUpdater.java
@@ -19,8 +19,6 @@
package org.apache.rya.indexing.pcj.fluo.app;
import static com.google.common.base.Preconditions.checkNotNull;
-import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.DELIM;
-import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.NODEID_BS_DELIM;
import java.util.ArrayList;
import java.util.HashSet;
@@ -43,13 +41,12 @@ import org.apache.rya.indexing.pcj.fluo.app.batch.BatchInformation.Task;
import org.apache.rya.indexing.pcj.fluo.app.batch.BatchInformationDAO;
import org.apache.rya.indexing.pcj.fluo.app.batch.JoinBatchInformation;
import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
-import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO;
+import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataCache;
import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata;
-import org.apache.rya.indexing.pcj.fluo.app.util.RowKeyUtil;
+import org.apache.rya.indexing.pcj.fluo.app.query.MetadataCacheSupplier;
import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSetSerDe;
-import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSetStringConverter;
import org.openrdf.query.Binding;
import org.openrdf.query.BindingSet;
import org.openrdf.query.impl.MapBindingSet;
@@ -65,14 +62,11 @@ import edu.umd.cs.findbugs.annotations.NonNull;
* new Binding Set to its results.
*/
@DefaultAnnotation(NonNull.class)
-public class JoinResultUpdater {
+public class JoinResultUpdater extends AbstractNodeUpdater {
private static final Logger log = Logger.getLogger(JoinResultUpdater.class);
-
private static final VisibilityBindingSetSerDe BS_SERDE = new VisibilityBindingSetSerDe();
- private static final VisibilityBindingSetStringConverter VIS_BS_CONVERTER = new VisibilityBindingSetStringConverter();
-
- private final FluoQueryMetadataDAO queryDao = new FluoQueryMetadataDAO();
+ private final FluoQueryMetadataCache queryDao = MetadataCacheSupplier.getOrCreateCache();
/**
* Updates the results of a Join node when one of its children has added a
@@ -130,7 +124,7 @@ public class JoinResultUpdater {
Span siblingSpan = getSpan(tx, childNodeId, childBindingSet, siblingId);
Column siblingColumn = getScanColumnFamily(siblingId);
Optional<RowColumn> rowColumn = fillSiblingBatch(tx, siblingSpan, siblingColumn, siblingBindingSets, joinMetadata.getJoinBatchSize());
-
+
// Iterates over the resulting BindingSets from the join.
final Iterator<VisibilityBindingSet> newJoinResults;
if(emittingSide == Side.LEFT) {
@@ -145,8 +139,8 @@ public class JoinResultUpdater {
final VisibilityBindingSet newJoinResult = newJoinResults.next();
// Create the Row Key for the emitted binding set. It does not contain visibilities.
- final Bytes resultRow = RowKeyUtil.makeRowKey(joinMetadata.getNodeId(), joinVarOrder, newJoinResult);
-
+ final Bytes resultRow = makeRowKey(joinMetadata.getNodeId(), joinVarOrder, newJoinResult);
+
// Only insert the join Binding Set if it is new or BindingSet contains values not used in resultRow.
if(tx.get(resultRow, FluoQueryColumns.JOIN_BINDING_SET) == null || joinVarOrder.getVariableOrders().size() < newJoinResult.size()) {
// Create the Node Value. It does contain visibilities.
@@ -159,7 +153,7 @@ public class JoinResultUpdater {
tx.set(resultRow, FluoQueryColumns.JOIN_BINDING_SET, nodeValueBytes);
}
}
-
+
// if batch limit met, there are additional entries to process
// update the span and register updated batch job
if (rowColumn.isPresent()) {
@@ -183,18 +177,18 @@ public class JoinResultUpdater {
public static enum Side {
LEFT, RIGHT;
}
-
-
+
+
/**
* Fetches batch to be processed by scanning over the Span specified by the
* {@link JoinBatchInformation}. The number of results is less than or equal
* to the batch size specified by the JoinBatchInformation.
- *
+ *
* @param tx - Fluo transaction in which batch operation is performed
* @param siblingSpan - span of sibling to retrieve elements to join with
* @param bsSet- set that batch results are added to
* @return Set - containing results of sibling scan.
- * @throws Exception
+ * @throws Exception
*/
private Optional<RowColumn> fillSiblingBatch(TransactionBase tx, Span siblingSpan, Column siblingColumn, Set<VisibilityBindingSet> bsSet, int batchSize) throws Exception {
@@ -222,7 +216,7 @@ public class JoinResultUpdater {
return Optional.absent();
}
}
-
+
/**
* Creates a Span for the sibling node to retrieve BindingSets to join with
* @param tx
@@ -231,29 +225,19 @@ public class JoinResultUpdater {
* @param siblingId - Id of the sibling node whose BindingSets will be retrieved and joined with the update
* @return Span to retrieve sibling node's BindingSets to form join results
*/
- private Span getSpan(TransactionBase tx, final String childId, final BindingSet childBindingSet, final String siblingId) {
+ private Span getSpan(TransactionBase tx, final String childId, final VisibilityBindingSet childBindingSet, final String siblingId) {
// Get the common variable orders. These are used to build the prefix.
final VariableOrder childVarOrder = getVarOrder(tx, childId);
final VariableOrder siblingVarOrder = getVarOrder(tx, siblingId);
final List<String> commonVars = getCommonVars(childVarOrder, siblingVarOrder);
- // Get the Binding strings
- final String childBindingSetString = VIS_BS_CONVERTER.convert(childBindingSet, childVarOrder);
- final String[] childBindingArray = childBindingSetString.split("\u0001");
- final String[] childBindingStrings = FluoStringConverter.toBindingStrings(childBindingArray[0]);
-
- // Create the prefix that will be used to scan for binding sets of the sibling node.
- // This prefix includes the sibling Node ID and the common variable values from
- // childBindingSet.
- String siblingScanPrefix = "";
- for(int i = 0; i < commonVars.size(); i++) {
- if(siblingScanPrefix.length() == 0) {
- siblingScanPrefix = childBindingStrings[i];
- } else {
- siblingScanPrefix += DELIM + childBindingStrings[i];
- }
+ Bytes siblingScanPrefix = null;
+ if(!commonVars.isEmpty()) {
+ siblingScanPrefix = makeRowKey(siblingId, new VariableOrder(commonVars), childBindingSet);
+ } else {
+ siblingScanPrefix = makeRowKey(siblingId, siblingVarOrder, childBindingSet);
}
- siblingScanPrefix = siblingId + NODEID_BS_DELIM + siblingScanPrefix;
+
return Span.prefix(siblingScanPrefix);
}
@@ -277,13 +261,13 @@ public class JoinResultUpdater {
return removeBinIdFromVarOrder(queryDao.readFilterMetadata(tx, nodeId).getVariableOrder());
case JOIN:
return removeBinIdFromVarOrder(queryDao.readJoinMetadata(tx, nodeId).getVariableOrder());
- case PROJECTION:
+ case PROJECTION:
return removeBinIdFromVarOrder(queryDao.readProjectionMetadata(tx, nodeId).getVariableOrder());
default:
throw new IllegalArgumentException("Could not figure out the variable order for node with ID: " + nodeId);
}
}
-
+
private VariableOrder removeBinIdFromVarOrder(VariableOrder varOrder) {
List<String> varOrderList = varOrder.getVariableOrders();
if(varOrderList.get(0).equals(IncrementalUpdateConstants.PERIODIC_BIN_ID)) {
@@ -311,7 +295,7 @@ public class JoinResultUpdater {
final List<String> commonVars = new ArrayList<>();
- // Only need to iteratre through the shorted order's length.
+ // Only need to iterate through the shorted order's length.
final Iterator<String> vars1It = vars1.iterator();
final Iterator<String> vars2It = vars2.iterator();
while(vars1It.hasNext() && vars2It.hasNext()) {
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/PeriodicQueryUpdater.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/PeriodicQueryUpdater.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/PeriodicQueryUpdater.java
index cb331cf..9b9b3ae 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/PeriodicQueryUpdater.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/PeriodicQueryUpdater.java
@@ -27,7 +27,6 @@ import org.apache.fluo.api.data.Column;
import org.apache.log4j.Logger;
import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
import org.apache.rya.indexing.pcj.fluo.app.query.PeriodicQueryMetadata;
-import org.apache.rya.indexing.pcj.fluo.app.util.RowKeyUtil;
import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSetSerDe;
import org.openrdf.model.Literal;
@@ -39,10 +38,10 @@ import org.openrdf.query.algebra.evaluation.QueryBindingSet;
/**
* This class adds the appropriate BinId Binding to each BindingSet that it processes. The BinIds
* are used to determine which period a BindingSet (with a temporal Binding) falls into so that
- * a user can receive periodic updates for a registered query.
+ * a user can receive periodic updates for a registered query.
*
*/
-public class PeriodicQueryUpdater {
+public class PeriodicQueryUpdater extends AbstractNodeUpdater {
private static final Logger log = Logger.getLogger(PeriodicQueryUpdater.class);
private static final ValueFactory vf = new ValueFactoryImpl();
@@ -65,9 +64,9 @@ public class PeriodicQueryUpdater {
binnedBs.addBinding(IncrementalUpdateConstants.PERIODIC_BIN_ID, vf.createLiteral(id));
VisibilityBindingSet visibilityBindingSet = new VisibilityBindingSet(binnedBs, bs.getVisibility());
Bytes periodicBsBytes = BS_SERDE.serialize(visibilityBindingSet);
-
- //create row
- final Bytes resultRow = RowKeyUtil.makeRowKey(metadata.getNodeId(), metadata.getVariableOrder(), binnedBs);
+
+ //create row
+ final Bytes resultRow = makeRowKey(metadata.getNodeId(), metadata.getVariableOrder(), visibilityBindingSet);
Column col = FluoQueryColumns.PERIODIC_QUERY_BINDING_SET;
tx.set(resultRow, col, periodicBsBytes);
}
@@ -75,8 +74,8 @@ public class PeriodicQueryUpdater {
/**
* This method returns the end times of all period windows containing the time contained in
- * the BindingSet.
- *
+ * the BindingSet.
+ *
* @param metadata
* @return Set of period bin end times
*/
@@ -97,7 +96,7 @@ public class PeriodicQueryUpdater {
private long getRightBinEndPoint(long eventDateTime, long periodDuration) {
return (eventDateTime / periodDuration + 1) * periodDuration;
}
-
+
private long getLeftBinEndPoint(long eventTime, long periodDuration) {
return (eventTime / periodDuration) * periodDuration;
}
@@ -116,7 +115,7 @@ public class PeriodicQueryUpdater {
long rightEventBin = getRightBinEndPoint(eventDateTime, periodDuration);
//get the bin left of the current moment for comparison
long currentBin = getLeftBinEndPoint(System.currentTimeMillis(), periodDuration);
-
+
if(currentBin >= rightEventBin) {
long numBins = (windowDuration -(currentBin - rightEventBin))/periodDuration;
for(int i = 0; i < numBins; i++) {
@@ -132,6 +131,6 @@ public class PeriodicQueryUpdater {
return binIds;
}
-
+
}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/ProjectionResultUpdater.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/ProjectionResultUpdater.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/ProjectionResultUpdater.java
index f9d8257..e928e3d 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/ProjectionResultUpdater.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/ProjectionResultUpdater.java
@@ -26,7 +26,6 @@ import org.apache.log4j.Logger;
import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
import org.apache.rya.indexing.pcj.fluo.app.query.ProjectionMetadata;
import org.apache.rya.indexing.pcj.fluo.app.util.BindingSetUtil;
-import org.apache.rya.indexing.pcj.fluo.app.util.RowKeyUtil;
import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSetSerDe;
@@ -40,7 +39,7 @@ import edu.umd.cs.findbugs.annotations.NonNull;
* new Binding Set to its results.
*/
@DefaultAnnotation(NonNull.class)
-public class ProjectionResultUpdater {
+public class ProjectionResultUpdater extends AbstractNodeUpdater {
private static final Logger log = Logger.getLogger(QueryResultUpdater.class);
private static final VisibilityBindingSetSerDe BS_SERDE = new VisibilityBindingSetSerDe();
@@ -65,7 +64,7 @@ public class ProjectionResultUpdater {
log.trace(
"Transaction ID: " + tx.getStartTimestamp() + "\n" +
"Node ID: " + projectionMetadata.getNodeId() + "\n" +
- "Parent Node ID: " + projectionMetadata.getParentNodeId() + "\n" +
+ "Parent Node ID: " + projectionMetadata.getParentNodeId() + "\n" +
"Child Node ID: " + projectionMetadata.getChildNodeId() + "\n" +
"Child Binding Set:\n" + childBindingSet + "\n");
@@ -73,12 +72,13 @@ public class ProjectionResultUpdater {
final VariableOrder queryVarOrder = projectionMetadata.getVariableOrder();
final VariableOrder projectionVarOrder = projectionMetadata.getProjectedVars();
final BindingSet queryBindingSet = BindingSetUtil.keepBindings(projectionVarOrder, childBindingSet);
+ VisibilityBindingSet projectedBs = new VisibilityBindingSet(queryBindingSet, childBindingSet.getVisibility());
// Create the Row Key for the result. If the child node groups results, then the key must only contain the Group By variables.
- Bytes resultRow = RowKeyUtil.makeRowKey(projectionMetadata.getNodeId(), queryVarOrder, queryBindingSet);
+ Bytes resultRow = makeRowKey(projectionMetadata.getNodeId(), queryVarOrder, projectedBs);
// Create the Binding Set that goes in the Node Value. It does contain visibilities.
- final Bytes nodeValueBytes = BS_SERDE.serialize(new VisibilityBindingSet(queryBindingSet, childBindingSet.getVisibility()));
+ final Bytes nodeValueBytes = BS_SERDE.serialize(projectedBs);
log.trace(
"Transaction ID: " + tx.getStartTimestamp() + "\n" +
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/QueryResultUpdater.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/QueryResultUpdater.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/QueryResultUpdater.java
index 37d7256..6c383e2 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/QueryResultUpdater.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/QueryResultUpdater.java
@@ -25,7 +25,6 @@ import org.apache.fluo.api.data.Bytes;
import org.apache.log4j.Logger;
import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
import org.apache.rya.indexing.pcj.fluo.app.query.QueryMetadata;
-import org.apache.rya.indexing.pcj.fluo.app.util.RowKeyUtil;
import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSetSerDe;
@@ -38,7 +37,7 @@ import edu.umd.cs.findbugs.annotations.NonNull;
* new Binding Set to its results.
*/
@DefaultAnnotation(NonNull.class)
-public class QueryResultUpdater {
+public class QueryResultUpdater extends AbstractNodeUpdater {
private static final Logger log = Logger.getLogger(QueryResultUpdater.class);
private static final VisibilityBindingSetSerDe BS_SERDE = new VisibilityBindingSetSerDe();
@@ -70,7 +69,7 @@ public class QueryResultUpdater {
final VariableOrder queryVarOrder = queryMetadata.getVariableOrder();
// Create the Row Key for the result. If the child node groups results, then the key must only contain the Group By variables.
- final Bytes resultRow = RowKeyUtil.makeRowKey(queryMetadata.getNodeId(), queryVarOrder, childBindingSet);
+ final Bytes resultRow = makeRowKey(queryMetadata.getNodeId(), queryVarOrder, childBindingSet);
// Create the Binding Set that goes in the Node Value. It does contain visibilities.
final Bytes nodeValueBytes = BS_SERDE.serialize(childBindingSet);
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/AbstractBatchBindingSetUpdater.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/AbstractBatchBindingSetUpdater.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/AbstractBatchBindingSetUpdater.java
index 9584a10..c077310 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/AbstractBatchBindingSetUpdater.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/AbstractBatchBindingSetUpdater.java
@@ -23,6 +23,8 @@ import org.apache.fluo.api.data.Bytes;
import org.apache.fluo.api.data.RowColumn;
import org.apache.fluo.api.data.Span;
import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
+import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataCache;
+import org.apache.rya.indexing.pcj.fluo.app.query.MetadataCacheSupplier;
/**
* This class provides common functionality for implementations of {@link BatchBindingSetUpdater}.
@@ -30,6 +32,8 @@ import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
*/
public abstract class AbstractBatchBindingSetUpdater implements BatchBindingSetUpdater {
+ protected static final FluoQueryMetadataCache CACHE = MetadataCacheSupplier.getOrCreateCache();
+
/**
* Updates the Span to create a new {@link BatchInformation} object to be fed to the
* {@link BatchObserver}. This message is called in the event that the BatchBindingSetUpdater
@@ -41,7 +45,7 @@ public abstract class AbstractBatchBindingSetUpdater implements BatchBindingSetU
public static Span getNewSpan(RowColumn newStart, Span oldSpan) {
return new Span(newStart, oldSpan.isStartInclusive(), oldSpan.getEnd(), oldSpan.isEndInclusive());
}
-
+
/**
* Cleans up old batch job. This method is meant to be called by any overriding method
* to clean up old batch tasks.
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/AbstractSpanBatchInformation.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/AbstractSpanBatchInformation.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/AbstractSpanBatchInformation.java
index 2da3e39..e15fa8f 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/AbstractSpanBatchInformation.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/AbstractSpanBatchInformation.java
@@ -36,7 +36,7 @@ public abstract class AbstractSpanBatchInformation extends BasicBatchInformation
* Create AbstractBatchInformation
* @param batchSize - size of batch to be processed
* @param task - type of task processed (Add, Delete, Udpate)
- * @param column - Cpolumn that Span notification is applied
+ * @param column - Column that Span notification is applied
* @param span - span used to indicate where processing should begin
*/
public AbstractSpanBatchInformation(int batchSize, Task task, Column column, Span span) {
@@ -62,7 +62,7 @@ public abstract class AbstractSpanBatchInformation extends BasicBatchInformation
public void setSpan(Span span) {
this.span = span;
}
-
+
@Override
public String toString() {
return new StringBuilder()
@@ -74,7 +74,7 @@ public abstract class AbstractSpanBatchInformation extends BasicBatchInformation
.append("}")
.toString();
}
-
+
@Override
public boolean equals(Object other) {
if (this == other) {
@@ -94,6 +94,6 @@ public abstract class AbstractSpanBatchInformation extends BasicBatchInformation
public int hashCode() {
return Objects.hash(super.getBatchSize(), span, super.getColumn(), super.getTask());
}
-
+
}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/JoinBatchBindingSetUpdater.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/JoinBatchBindingSetUpdater.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/JoinBatchBindingSetUpdater.java
index a266341..a883be1 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/JoinBatchBindingSetUpdater.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/JoinBatchBindingSetUpdater.java
@@ -37,9 +37,8 @@ import org.apache.rya.indexing.pcj.fluo.app.JoinResultUpdater.NaturalJoin;
import org.apache.rya.indexing.pcj.fluo.app.JoinResultUpdater.Side;
import org.apache.rya.indexing.pcj.fluo.app.batch.BatchInformation.Task;
import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
-import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO;
import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata;
-import org.apache.rya.indexing.pcj.fluo.app.util.RowKeyUtil;
+import org.apache.rya.indexing.pcj.fluo.app.util.BindingHashShardingFunction;
import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSetSerDe;
@@ -53,7 +52,6 @@ public class JoinBatchBindingSetUpdater extends AbstractBatchBindingSetUpdater {
private static final Logger log = Logger.getLogger(JoinBatchBindingSetUpdater.class);
private static final VisibilityBindingSetSerDe BS_SERDE = new VisibilityBindingSetSerDe();
- private static final FluoQueryMetadataDAO dao = new FluoQueryMetadataDAO();
/**
* Processes {@link JoinBatchInformation}. Updates the BindingSets
@@ -65,7 +63,7 @@ public class JoinBatchBindingSetUpdater extends AbstractBatchBindingSetUpdater {
* entries that need to be updated exceeds the batch size, the row of the
* first unprocessed BindingSets is used to create a new JoinBatch job to
* process the remaining BindingSets.
- * @throws Exception
+ * @throws Exception
*/
@Override
public void processBatch(TransactionBase tx, Bytes row, BatchInformation batch) throws Exception {
@@ -100,15 +98,15 @@ public class JoinBatchBindingSetUpdater extends AbstractBatchBindingSetUpdater {
newJoinResults = joinAlgorithm.newRightResult(bsSet.iterator(), bs);
}
- // Insert the new join binding sets to the Fluo table.
- final JoinMetadata joinMetadata = dao.readJoinMetadata(tx, nodeId);
+ // Read join metadata, create new join BindingSets and insert them into the Fluo table.
+ final JoinMetadata joinMetadata = CACHE.readJoinMetadata(tx, nodeId);
final VariableOrder joinVarOrder = joinMetadata.getVariableOrder();
while (newJoinResults.hasNext()) {
final VisibilityBindingSet newJoinResult = newJoinResults.next();
//create BindingSet value
Bytes bsBytes = BS_SERDE.serialize(newJoinResult);
//make rowId
- Bytes rowKey = RowKeyUtil.makeRowKey(nodeId, joinVarOrder, newJoinResult);
+ Bytes rowKey = BindingHashShardingFunction.addShard(nodeId, joinVarOrder, newJoinResult);
final Column col = FluoQueryColumns.JOIN_BINDING_SET;
processTask(tx, task, rowKey, col, bsBytes);
}
@@ -144,12 +142,12 @@ public class JoinBatchBindingSetUpdater extends AbstractBatchBindingSetUpdater {
* Fetches batch to be processed by scanning over the Span specified by the
* {@link JoinBatchInformation}. The number of results is less than or equal
* to the batch size specified by the JoinBatchInformation.
- *
+ *
* @param tx - Fluo transaction in which batch operation is performed
* @param batch - batch order to be processed
* @param bsSet- set that batch results are added to
* @return Set - containing results of sibling scan.
- * @throws Exception
+ * @throws Exception
*/
private Optional<RowColumn> fillSiblingBatch(TransactionBase tx, JoinBatchInformation batch, Set<VisibilityBindingSet> bsSet) throws Exception {
[2/4] incubator-rya git commit: RYA-406. Closes #251.
Posted by ca...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/BindingHashShardingFunction.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/BindingHashShardingFunction.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/BindingHashShardingFunction.java
new file mode 100644
index 0000000..e095309
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/BindingHashShardingFunction.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.util;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.NODEID_BS_DELIM;
+import static org.apache.rya.indexing.pcj.storage.accumulo.BindingSetStringConverter.TYPE_DELIM;
+
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.Bytes.BytesBuilder;
+import org.apache.rya.api.domain.RyaType;
+import org.apache.rya.api.resolver.RdfToRyaConversions;
+import org.apache.rya.indexing.pcj.storage.accumulo.BindingSetStringConverter;
+import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
+import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
+import org.openrdf.model.Value;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import com.google.common.hash.Hashing;
+
+/**
+ * This class adds or removes a hash to or from the rowId for sharding purposes. When creating a sharded row key, it
+ * takes the form: node_prefix:shardId:nodeId//Binding_values. For example, the row key generated from nodeId = SP_123,
+ * varOrder = a;b, bs = [a = uri:Bob, b = uri:Doug] would be SP:HASH(uri:Bob):123//uri:Bob;uri:Doug, where HASH(uri:Bob)
+ * indicates the shard id hash generated from the Binding value "uri:Bob".
+ *
+ */
+public class BindingHashShardingFunction {
+
+ private static final BindingSetStringConverter BS_CONVERTER = new BindingSetStringConverter();
+ private static final int HASH_LEN = 4;
+
+ /**
+ * Generates a sharded rowId. The rowId is of the form: node_prefix:shardId:nodeId//Binding_values. For
+ * example, the row key generated from nodeId = SP_123, varOrder = a;b, bs = [a = uri:Bob, b = uri:Doug] would be
+ * SP:HASH(uri:Bob):123//uri:Bob;uri:Doug, where HASH(uri:Bob) indicates the shard id hash generated from the
+ * Binding value "uri:Bob".
+ *
+ * @param nodeId - Node Id with type and UUID
+ * @param varOrder - VarOrder used to order BindingSet values
+ * @param bs - BindingSet with partially formed query values
+ * @return - serialized Bytes rowId for storing BindingSet results in Fluo
+ */
+ public static Bytes addShard(String nodeId, VariableOrder varOrder, VisibilityBindingSet bs) {
+ checkNotNull(nodeId);
+ checkNotNull(varOrder);
+ checkNotNull(bs);
+ String[] rowPrefixAndId = nodeId.split("_");
+ Preconditions.checkArgument(rowPrefixAndId.length == 2);
+ String prefix = rowPrefixAndId[0];
+ String id = rowPrefixAndId[1];
+
+ String firstBindingString = "";
+ Bytes rowSuffix = Bytes.of(id);
+ if (varOrder.getVariableOrders().size() > 0) {
+ VariableOrder first = new VariableOrder(varOrder.getVariableOrders().get(0));
+ firstBindingString = BS_CONVERTER.convert(bs, first);
+ rowSuffix = RowKeyUtil.makeRowKey(id, varOrder, bs);
+ }
+
+ BytesBuilder builder = Bytes.builder();
+ builder.append(Bytes.of(prefix + ":"));
+ builder.append(genHash(Bytes.of(id + NODEID_BS_DELIM + firstBindingString)));
+ builder.append(":");
+ builder.append(rowSuffix);
+ return builder.toBytes();
+ }
+
+ /**
+ * Generates a sharded rowId. The rowId is of the form: node_prefix:shardId:nodeId//Binding_values. For
+ * example, the row key generated from nodeId = SP_123, varOrder = a;b, bs = [a = uri:Bob, b = uri:Doug] would be
+ * SP:HASH(uri:Bob):123//uri:Bob;uri:Doug, where HASH(uri:Bob) indicates the shard id hash generated from the
+ * Binding value "uri:Bob".
+ *
+ * @param nodeId - Node Id with type and UUID
+ * @param firstBsVal - String representation of the first BsValue
+ * @return - serialized Bytes prefix for scanning rows
+ */
+ public static Bytes getShardedScanPrefix(String nodeId, Value firstBsVal) {
+ checkNotNull(nodeId);
+ checkNotNull(firstBsVal);
+
+ final RyaType ryaValue = RdfToRyaConversions.convertValue(firstBsVal);
+ final String bindingString = ryaValue.getData() + TYPE_DELIM + ryaValue.getDataType();
+
+ return getShardedScanPrefix(nodeId, bindingString);
+ }
+
+ /**
+ * Generates a sharded rowId from the indicated nodeId and bindingString. For example, the row key generated from
+ * nodeId = SP_123, varOrder = a;b, bs = [a = uri:Bob, b = uri:Doug] would be
+ * SP:HASH(uri:Bob):123//uri:Bob;uri:Doug, where HASH(uri:Bob) indicates the shard id hash generated from the
+ * Binding value "uri:Bob".
+ *
+ * @param nodeId - NodeId with tyep and UUID
+ * @param bindingString - String representation of BindingSet values, as formed by {@link BindingSetStringConverter}
+ * @return - serialized, sharded Bytes prefix
+ */
+ public static Bytes getShardedScanPrefix(String nodeId, String bindingString) {
+ checkNotNull(nodeId);
+ checkNotNull(bindingString);
+ String[] rowPrefixAndId = nodeId.split("_");
+ Preconditions.checkArgument(rowPrefixAndId.length == 2);
+ String prefix = rowPrefixAndId[0];
+ String id = rowPrefixAndId[1];
+
+ BytesBuilder builder = Bytes.builder();
+ builder.append(prefix + ":");
+ builder.append(genHash(Bytes.of(id + NODEID_BS_DELIM + bindingString)));
+ builder.append(":");
+ builder.append(id);
+ builder.append(NODEID_BS_DELIM);
+ builder.append(bindingString);
+ return builder.toBytes();
+ }
+
+ private static boolean hasHash(Bytes prefixBytes, Bytes row) {
+ for (int i = prefixBytes.length() + 1; i < prefixBytes.length() + HASH_LEN; i++) {
+ byte b = row.byteAt(i);
+ boolean isAlphaNum = (b >= 'a' && b <= 'z') || (b >= '0' && b <= '9');
+ if (!isAlphaNum) {
+ return false;
+ }
+ }
+
+ if (row.byteAt(prefixBytes.length()) != ':' || row.byteAt(prefixBytes.length() + HASH_LEN + 1) != ':') {
+ return false;
+ }
+
+ return true;
+ }
+
+ /**
+ * @return Returns input with prefix and hash stripped from beginning.
+ */
+ public static Bytes removeHash(Bytes prefixBytes, Bytes row) {
+ checkNotNull(prefixBytes);
+ checkNotNull(row);
+ checkArgument(row.length() >= prefixBytes.length() + 6, "Row is shorter than expected " + row);
+ checkArgument(row.subSequence(0, prefixBytes.length()).equals(prefixBytes),
+ "Row does not have expected prefix " + row);
+ checkArgument(hasHash(prefixBytes, row), "Row does not have expected hash " + row);
+
+ BytesBuilder builder = Bytes.builder();
+ builder.append(prefixBytes);
+ builder.append("_");
+ builder.append(row.subSequence(prefixBytes.length() + 6, row.length()));
+ return builder.toBytes();
+ }
+
+ private static String genHash(Bytes row) {
+ int hash = Hashing.murmur3_32().hashBytes(row.toArray()).asInt();
+ hash = hash & 0x7fffffff;
+ // base 36 gives a lot more bins in 4 bytes than hex, but it is still human readable which is
+ // nice for debugging.
+ String hashString = Strings.padStart(Integer.toString(hash, Character.MAX_RADIX), HASH_LEN, '0');
+ hashString = hashString.substring(hashString.length() - HASH_LEN);
+ return hashString;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/TriplePrefixUtils.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/TriplePrefixUtils.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/TriplePrefixUtils.java
new file mode 100644
index 0000000..d451d28
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/TriplePrefixUtils.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.util;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.NODEID_BS_DELIM;
+import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.TRIPLE_PREFIX;
+
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.Bytes.BytesBuilder;
+import org.apache.rya.api.domain.RyaStatement;
+
+/**
+ * This class is a utility class for adding and removing the Triple prefix to
+ * Statements ingested into the Rya Fluo application. The Triple prefix is added
+ * to supported range based compactions for removing transient data from the Fluo
+ * application. This prefix supports the Transient data recipe described on the
+ * the Fluo website, and reduces the computational load on the system by cleaning up
+ * old deleted Triples and notifications using a targeted range compaction.
+ *
+ */
+public class TriplePrefixUtils {
+
+ private static final Bytes TRIPLE_PREFIX_BYTES = Bytes.of(TRIPLE_PREFIX + NODEID_BS_DELIM);
+
+ /**
+ * Prepends the triple prefix to the provided bytes and returns the new value as a {@link Bytes}.
+ * @param tripleBytes - serialized {@link RyaStatement}
+ * @return - serialized RyaStatement with prepended triple prefix, converted to Bytes
+ */
+ public static Bytes addTriplePrefixAndConvertToBytes(byte[] tripleBytes) {
+ checkNotNull(tripleBytes);
+ BytesBuilder builder = Bytes.builder();
+ return builder.append(TRIPLE_PREFIX_BYTES).append(tripleBytes).toBytes();
+ }
+
+ /**
+ * Removes the triple prefix and returns the new value as a byte array.
+ * @param prefixedTriple - serialized RyaStatement with prepended triple prefix, converted to Bytes
+ * @return - serialized {@link RyaStatement} in byte array form
+ */
+ public static byte[] removeTriplePrefixAndConvertToByteArray(Bytes prefixedTriple) {
+ checkNotNull(prefixedTriple);
+ return prefixedTriple.subSequence(TRIPLE_PREFIX_BYTES.length(), prefixedTriple.length()).toArray();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataCacheTest.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataCacheTest.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataCacheTest.java
index 3df3708..f0faf1f 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataCacheTest.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataCacheTest.java
@@ -1,3 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
package org.apache.rya.indexing.pcj.fluo.app.query;
import static org.junit.Assert.assertEquals;
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/StatementPatternIdCacheTest.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/StatementPatternIdCacheTest.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/StatementPatternIdCacheTest.java
new file mode 100644
index 0000000..657b548
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/StatementPatternIdCacheTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.query;
+
+import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.STATEMENT_PATTERN_ID;
+import static org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns.STATEMENT_PATTERN_IDS;
+import static org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns.STATEMENT_PATTERN_IDS_HASH;
+import static org.mockito.Mockito.when;
+
+import java.util.Set;
+
+import org.apache.fluo.api.client.Transaction;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.rya.indexing.pcj.fluo.app.NodeType;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import com.google.common.collect.Sets;
+
+public class StatementPatternIdCacheTest {
+
+ @Test
+ public void testCache() {
+ Transaction mockTx = Mockito.mock(Transaction.class);
+ String nodeId = NodeType.generateNewFluoIdForType(NodeType.STATEMENT_PATTERN);
+ when(mockTx.get(Bytes.of(STATEMENT_PATTERN_ID), STATEMENT_PATTERN_IDS)).thenReturn(Bytes.of(nodeId));
+ when(mockTx.get(Bytes.of(STATEMENT_PATTERN_ID), STATEMENT_PATTERN_IDS_HASH)).thenReturn(Bytes.of("123"));
+
+ StatementPatternIdCache cache = new StatementPatternIdCache();
+ Set<String> ids1 = cache.getStatementPatternIds(mockTx);
+ Set<String> ids2 = cache.getStatementPatternIds(mockTx);
+
+ Assert.assertEquals(ids1, ids2);
+ Assert.assertEquals(Sets.newHashSet(nodeId), ids1);
+
+ Mockito.verify(mockTx, Mockito.times(1)).get(Bytes.of(STATEMENT_PATTERN_ID), STATEMENT_PATTERN_IDS);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/util/BindingHashShardingFunctionTest.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/util/BindingHashShardingFunctionTest.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/util/BindingHashShardingFunctionTest.java
new file mode 100644
index 0000000..729cdeb
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/util/BindingHashShardingFunctionTest.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.util;
+
+import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.SP_PREFIX;
+
+import org.apache.fluo.api.data.Bytes;
+import org.apache.rya.indexing.pcj.fluo.app.BindingSetRow;
+import org.apache.rya.indexing.pcj.fluo.app.NodeType;
+import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
+import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
+import org.junit.Assert;
+import org.junit.Test;
+import org.openrdf.model.ValueFactory;
+import org.openrdf.model.impl.ValueFactoryImpl;
+import org.openrdf.query.algebra.evaluation.QueryBindingSet;
+
+public class BindingHashShardingFunctionTest {
+
+ private static final ValueFactory vf = new ValueFactoryImpl();
+
+ @Test
+ public void shardAddAndRemoveTest() {
+ String nodeId = NodeType.generateNewFluoIdForType(NodeType.STATEMENT_PATTERN);
+ QueryBindingSet bs = new QueryBindingSet();
+ bs.addBinding("entity", vf.createURI("urn:entity"));
+ bs.addBinding("location", vf.createLiteral("location_1"));
+ VisibilityBindingSet vBs = new VisibilityBindingSet(bs);
+ VariableOrder varOrder = new VariableOrder("entity","location");
+ Bytes row = RowKeyUtil.makeRowKey(nodeId, varOrder, vBs);
+ Bytes shardedRow = BindingHashShardingFunction.addShard(nodeId, varOrder, vBs);
+ Bytes shardlessRow = BindingHashShardingFunction.removeHash(Bytes.of(SP_PREFIX), shardedRow);
+ Assert.assertEquals(row, shardlessRow);
+ }
+
+ @Test
+ public void bindingSetRowTest() {
+ String nodeId = NodeType.generateNewFluoIdForType(NodeType.STATEMENT_PATTERN);
+ QueryBindingSet bs = new QueryBindingSet();
+ bs.addBinding("entity", vf.createURI("urn:entity"));
+ bs.addBinding("location", vf.createLiteral("location_1"));
+ VisibilityBindingSet vBs = new VisibilityBindingSet(bs);
+ VariableOrder varOrder = new VariableOrder("entity","location");
+ Bytes row = RowKeyUtil.makeRowKey(nodeId, varOrder, vBs);
+ Bytes shardedRow = BindingHashShardingFunction.addShard(nodeId, varOrder, vBs);
+ BindingSetRow expected = BindingSetRow.make(row);
+ BindingSetRow actual = BindingSetRow.makeFromShardedRow(Bytes.of(SP_PREFIX), shardedRow);
+ Assert.assertEquals(expected, actual);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/util/TriplePrefixUtilsTest.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/util/TriplePrefixUtilsTest.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/util/TriplePrefixUtilsTest.java
new file mode 100644
index 0000000..ac41dc8
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/util/TriplePrefixUtilsTest.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.util;
+
+import java.util.Arrays;
+
+import org.apache.fluo.api.data.Bytes;
+import org.apache.rya.api.resolver.triple.TripleRowResolverException;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TriplePrefixUtilsTest {
+
+ @Test
+ public void testAddRemovePrefix() throws TripleRowResolverException {
+ byte[] expected = Bytes.of("triple").toArray();
+ Bytes fluoBytes = TriplePrefixUtils.addTriplePrefixAndConvertToBytes(expected);
+ byte[] returned = TriplePrefixUtils.removeTriplePrefixAndConvertToByteArray(fluoBytes);
+ Assert.assertEquals(true, Arrays.equals(expected, returned));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.integration/pom.xml
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/pom.xml b/extras/rya.pcj.fluo/pcj.fluo.integration/pom.xml
index c038dbe..e594474 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.integration/pom.xml
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/pom.xml
@@ -1,14 +1,21 @@
-<?xml version="1.0" encoding="utf-8"?>
-<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor
- license agreements. See the NOTICE file distributed with this work for additional
- information regarding copyright ownership. The ASF licenses this file to
- you under the Apache License, Version 2.0 (the "License"); you may not use
- this file except in compliance with the License. You may obtain a copy of
- the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
- by applicable law or agreed to in writing, software distributed under the
- License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
- OF ANY KIND, either express or implied. See the License for the specific
- language governing permissions and limitations under the License. -->
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/BatchIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/BatchIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/BatchIT.java
index 66aa04b..7e882e9 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/BatchIT.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/BatchIT.java
@@ -41,7 +41,7 @@ import org.apache.rya.api.domain.RyaStatement;
import org.apache.rya.api.domain.RyaURI;
import org.apache.rya.indexing.pcj.fluo.api.CreateFluoPcj;
import org.apache.rya.indexing.pcj.fluo.api.InsertTriples;
-import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants;
+import org.apache.rya.indexing.pcj.fluo.app.BindingSetRow;
import org.apache.rya.indexing.pcj.fluo.app.JoinResultUpdater.Side;
import org.apache.rya.indexing.pcj.fluo.app.NodeType;
import org.apache.rya.indexing.pcj.fluo.app.batch.BatchInformation;
@@ -54,13 +54,16 @@ import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO;
import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata.JoinType;
import org.apache.rya.indexing.pcj.fluo.app.query.UnsupportedQueryException;
+import org.apache.rya.indexing.pcj.fluo.app.util.BindingHashShardingFunction;
import org.apache.rya.indexing.pcj.fluo.app.util.FluoQueryUtils;
import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage;
import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
import org.apache.rya.pcj.fluo.test.base.RyaExportITBase;
import org.junit.Test;
-import org.openrdf.model.impl.URIImpl;
+import org.openrdf.model.URI;
+import org.openrdf.model.ValueFactory;
+import org.openrdf.model.impl.ValueFactoryImpl;
import org.openrdf.query.algebra.evaluation.QueryBindingSet;
import com.google.common.base.Optional;
@@ -70,6 +73,7 @@ public class BatchIT extends RyaExportITBase {
private static final Logger log = Logger.getLogger(BatchIT.class);
private static final FluoQueryMetadataDAO dao = new FluoQueryMetadataDAO();
+ private static final ValueFactory vf = new ValueFactoryImpl();
@Test
public void simpleScanDelete() throws Exception {
@@ -89,16 +93,16 @@ public class BatchIT extends RyaExportITBase {
final String pcjId = pcjStorage.createPcj(sparql);
// Tell the Fluo app to maintain the PCJ.
- String queryId = new CreateFluoPcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, getAccumuloConnector(),
- getRyaInstanceName()).getQueryId();
+ String queryId = new CreateFluoPcj()
+ .withRyaIntegration(pcjId, pcjStorage, fluoClient, getAccumuloConnector(), getRyaInstanceName()).getQueryId();
List<String> ids = getNodeIdStrings(fluoClient, queryId);
List<String> prefixes = Arrays.asList("urn:subject_1", "urn:subject_1", "urn:object", "urn:subject_1", "urn:subject_1");
// Stream the data into Fluo.
InsertTriples inserter = new InsertTriples();
- inserter.insert(fluoClient, statements1, Optional.<String> absent());
- inserter.insert(fluoClient, statements2, Optional.<String> absent());
+ inserter.insert(fluoClient, statements1, Optional.absent());
+ inserter.insert(fluoClient, statements2, Optional.absent());
// Verify the end results of the query match the expected results.
getMiniFluo().waitForObservers();
@@ -129,22 +133,26 @@ public class BatchIT extends RyaExportITBase {
final String pcjId = pcjStorage.createPcj(sparql);
// Tell the Fluo app to maintain the PCJ.
- String queryId = new CreateFluoPcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, getAccumuloConnector(),
- getRyaInstanceName()).getQueryId();
+ String queryId = new CreateFluoPcj()
+ .withRyaIntegration(pcjId, pcjStorage, fluoClient, getAccumuloConnector(), getRyaInstanceName()).getQueryId();
List<String> ids = getNodeIdStrings(fluoClient, queryId);
String joinId = ids.get(2);
String rightSp = ids.get(4);
QueryBindingSet bs = new QueryBindingSet();
- bs.addBinding("subject", new URIImpl("urn:subject_1"));
- bs.addBinding("object1", new URIImpl("urn:object_0"));
+ bs.addBinding("subject", vf.createURI("urn:subject_1"));
+ bs.addBinding("object1", vf.createURI("urn:object_0"));
VisibilityBindingSet vBs = new VisibilityBindingSet(bs);
- Span span = Span.prefix(Bytes.of(rightSp + IncrementalUpdateConstants.NODEID_BS_DELIM + "urn:subject_1"));
+
+ //create sharded span for deletion
+ URI uri = vf.createURI("urn:subject_1");
+ Bytes prefixBytes = BindingHashShardingFunction.getShardedScanPrefix(rightSp, uri);
+ Span span = Span.prefix(prefixBytes);
// Stream the data into Fluo.
InsertTriples inserter = new InsertTriples();
- inserter.insert(fluoClient, statements1, Optional.<String> absent());
- inserter.insert(fluoClient, statements2, Optional.<String> absent());
+ inserter.insert(fluoClient, statements1, Optional.absent());
+ inserter.insert(fluoClient, statements2, Optional.absent());
getMiniFluo().waitForObservers();
verifyCounts(fluoClient, ids, Arrays.asList(25, 25, 25, 5, 5));
@@ -175,21 +183,24 @@ public class BatchIT extends RyaExportITBase {
final String pcjId = pcjStorage.createPcj(sparql);
// Tell the Fluo app to maintain the PCJ.
- String queryId = new CreateFluoPcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, getAccumuloConnector(),
- getRyaInstanceName()).getQueryId();
+ String queryId = new CreateFluoPcj()
+ .withRyaIntegration(pcjId, pcjStorage, fluoClient, getAccumuloConnector(), getRyaInstanceName()).getQueryId();
List<String> ids = getNodeIdStrings(fluoClient, queryId);
String joinId = ids.get(2);
String rightSp = ids.get(4);
QueryBindingSet bs = new QueryBindingSet();
- bs.addBinding("subject", new URIImpl("urn:subject_1"));
- bs.addBinding("object1", new URIImpl("urn:object_0"));
+ bs.addBinding("subject", vf.createURI("urn:subject_1"));
+ bs.addBinding("object1", vf.createURI("urn:object_0"));
VisibilityBindingSet vBs = new VisibilityBindingSet(bs);
- Span span = Span.prefix(Bytes.of(rightSp + IncrementalUpdateConstants.NODEID_BS_DELIM + "urn:subject_1"));
+
+ URI uri = vf.createURI("urn:subject_1");
+ Bytes prefixBytes = BindingHashShardingFunction.getShardedScanPrefix(rightSp, uri);
+ Span span = Span.prefix(prefixBytes);
// Stream the data into Fluo.
InsertTriples inserter = new InsertTriples();
- inserter.insert(fluoClient, statements2, Optional.<String> absent());
+ inserter.insert(fluoClient, statements2, Optional.absent());
getMiniFluo().waitForObservers();
verifyCounts(fluoClient, ids, Arrays.asList(0, 0, 0, 0, 5));
@@ -214,7 +225,7 @@ public class BatchIT extends RyaExportITBase {
RyaURI subj = new RyaURI("urn:subject_1");
RyaStatement statement1 = new RyaStatement(subj, new RyaURI("urn:predicate_1"), null);
RyaStatement statement2 = new RyaStatement(subj, new RyaURI("urn:predicate_2"), null);
-
+
Set<RyaStatement> statements1 = getRyaStatements(statement1, 15);
Set<RyaStatement> statements2 = getRyaStatements(statement2, 15);
@@ -224,22 +235,21 @@ public class BatchIT extends RyaExportITBase {
// Tell the Fluo app to maintain the PCJ and sets batch scan size for StatementPatterns to 5 and
// batch size of joins to 5.
- String queryId = new CreateFluoPcj(5, 5).withRyaIntegration(pcjId, pcjStorage, fluoClient, getAccumuloConnector(),
- getRyaInstanceName()).getQueryId();
+ String queryId = new CreateFluoPcj(5, 5)
+ .withRyaIntegration(pcjId, pcjStorage, fluoClient, getAccumuloConnector(), getRyaInstanceName()).getQueryId();
List<String> ids = getNodeIdStrings(fluoClient, queryId);
// Stream the data into Fluo.
InsertTriples inserter = new InsertTriples();
- inserter.insert(fluoClient, statements1, Optional.<String> absent());
- inserter.insert(fluoClient, statements2, Optional.<String> absent());
+ inserter.insert(fluoClient, statements1, Optional.absent());
+ inserter.insert(fluoClient, statements2, Optional.absent());
getMiniFluo().waitForObservers();
verifyCounts(fluoClient, ids, Arrays.asList(225, 225, 225, 15, 15));
}
}
-
-
+
@Test
public void leftJoinBatchIntegrationTest() throws Exception {
final String sparql = "SELECT ?subject ?object1 ?object2 WHERE { ?subject <urn:predicate_1> ?object1; "
@@ -249,10 +259,10 @@ public class BatchIT extends RyaExportITBase {
RyaURI subj = new RyaURI("urn:subject_1");
RyaStatement statement1 = new RyaStatement(subj, new RyaURI("urn:predicate_1"), null);
RyaStatement statement2 = new RyaStatement(subj, new RyaURI("urn:predicate_2"), null);
-
+
subj = new RyaURI("urn:subject_2");
RyaStatement statement3 = new RyaStatement(subj, new RyaURI("urn:predicate_1"), null);
-
+
Set<RyaStatement> statements1 = getRyaStatements(statement1, 10);
Set<RyaStatement> statements2 = getRyaStatements(statement2, 10);
Set<RyaStatement> statements3 = getRyaStatements(statement3, 10);
@@ -263,37 +273,35 @@ public class BatchIT extends RyaExportITBase {
// Tell the Fluo app to maintain the PCJ and sets batch scan size for StatementPatterns to 5 and
// batch size of joins to 5.
- String queryId = new CreateFluoPcj(5, 5).withRyaIntegration(pcjId, pcjStorage, fluoClient, getAccumuloConnector(),
- getRyaInstanceName()).getQueryId();
+ String queryId = new CreateFluoPcj(5, 5)
+ .withRyaIntegration(pcjId, pcjStorage, fluoClient, getAccumuloConnector(), getRyaInstanceName()).getQueryId();
List<String> ids = getNodeIdStrings(fluoClient, queryId);
// Stream the data into Fluo.
InsertTriples inserter = new InsertTriples();
- inserter.insert(fluoClient, statements1, Optional.<String> absent());
- inserter.insert(fluoClient, statements2, Optional.<String> absent());
- inserter.insert(fluoClient, statements3, Optional.<String> absent());
+ inserter.insert(fluoClient, statements1, Optional.absent());
+ inserter.insert(fluoClient, statements2, Optional.absent());
+ inserter.insert(fluoClient, statements3, Optional.absent());
getMiniFluo().waitForObservers();
verifyCounts(fluoClient, ids, Arrays.asList(110, 110, 110, 20, 10));
}
}
-
-
+
@Test
public void multiJoinBatchIntegrationTest() throws Exception {
final String sparql = "SELECT ?subject1 ?subject2 ?object1 ?object2 WHERE { ?subject1 <urn:predicate_1> ?object1; "
- + " <urn:predicate_2> ?object2 ."
- + " ?subject2 <urn:predicate_3> ?object2 } ";
+ + " <urn:predicate_2> ?object2 ." + " ?subject2 <urn:predicate_3> ?object2 } ";
try (FluoClient fluoClient = new FluoClientImpl(getFluoConfiguration())) {
RyaURI subj1 = new RyaURI("urn:subject_1");
RyaStatement statement1 = new RyaStatement(subj1, new RyaURI("urn:predicate_1"), null);
RyaStatement statement2 = new RyaStatement(subj1, new RyaURI("urn:predicate_2"), null);
-
+
Set<RyaStatement> statements1 = getRyaStatements(statement1, 10);
Set<RyaStatement> statements2 = getRyaStatements(statement2, 10);
-
+
RyaURI subj2 = new RyaURI("urn:subject_2");
RyaStatement statement3 = new RyaStatement(subj2, new RyaURI("urn:predicate_3"), null);
Set<RyaStatement> statements3 = getRyaStatements(statement3, 10);
@@ -304,23 +312,22 @@ public class BatchIT extends RyaExportITBase {
// Tell the Fluo app to maintain the PCJ and sets batch scan size for StatementPatterns to 5 and
// batch size of joins to 5.
- String queryId = new CreateFluoPcj(5, 5).withRyaIntegration(pcjId, pcjStorage, fluoClient, getAccumuloConnector(),
- getRyaInstanceName()).getQueryId();
+ String queryId = new CreateFluoPcj(5, 5)
+ .withRyaIntegration(pcjId, pcjStorage, fluoClient, getAccumuloConnector(), getRyaInstanceName()).getQueryId();
List<String> ids = getNodeIdStrings(fluoClient, queryId);
// Stream the data into Fluo.
InsertTriples inserter = new InsertTriples();
- inserter.insert(fluoClient, statements1, Optional.<String> absent());
- inserter.insert(fluoClient, statements2, Optional.<String> absent());
- inserter.insert(fluoClient, statements3, Optional.<String> absent());
+ inserter.insert(fluoClient, statements1, Optional.absent());
+ inserter.insert(fluoClient, statements2, Optional.absent());
+ inserter.insert(fluoClient, statements3, Optional.absent());
getMiniFluo().waitForObservers();
verifyCounts(fluoClient, ids, Arrays.asList(100, 100, 100, 100, 10, 10, 10));
}
}
-
private Set<RyaStatement> getRyaStatements(RyaStatement statement, int numTriples) {
Set<RyaStatement> statements = new HashSet<>();
@@ -361,13 +368,19 @@ public class BatchIT extends RyaExportITBase {
for (int i = 0; i < ids.size(); i++) {
String id = ids.get(i);
String bsPrefix = prefixes.get(i);
+ URI uri = vf.createURI(bsPrefix);
+ Bytes prefixBytes = BindingHashShardingFunction.getShardedScanPrefix(id, uri);
NodeType type = NodeType.fromNodeId(id).get();
Column bsCol = type.getResultColumn();
- String row = id + IncrementalUpdateConstants.NODEID_BS_DELIM + bsPrefix;
- Span span = Span.prefix(Bytes.of(row));
- BatchInformation batch = SpanBatchDeleteInformation.builder().setBatchSize(batchSize).setColumn(bsCol).setSpan(span)
- .build();
- BatchInformationDAO.addBatch(tx, id, batch);
+ SpanBatchDeleteInformation.Builder builder = SpanBatchDeleteInformation.builder().setBatchSize(batchSize)
+ .setColumn(bsCol);
+ if (type == NodeType.JOIN) {
+ builder.setSpan(Span.prefix(type.getNodeTypePrefix()));
+ builder.setNodeId(java.util.Optional.of(id));
+ } else {
+ builder.setSpan(Span.prefix(prefixBytes));
+ }
+ BatchInformationDAO.addBatch(tx, id, builder.build());
}
tx.commit();
}
@@ -383,14 +396,19 @@ public class BatchIT extends RyaExportITBase {
private int countResults(FluoClient fluoClient, String nodeId, Column bsColumn) {
try (Transaction tx = fluoClient.newTransaction()) {
int count = 0;
- RowScanner scanner = tx.scanner().over(Span.prefix(nodeId)).fetch(bsColumn).byRow().build();
+ Optional<NodeType> type = NodeType.fromNodeId(nodeId);
+ Bytes prefixBytes = Bytes.of(type.get().getNodeTypePrefix());
+ RowScanner scanner = tx.scanner().over(Span.prefix(prefixBytes)).fetch(bsColumn).byRow().build();
Iterator<ColumnScanner> colScanners = scanner.iterator();
while (colScanners.hasNext()) {
ColumnScanner colScanner = colScanners.next();
- Iterator<ColumnValue> vals = colScanner.iterator();
- while (vals.hasNext()) {
- vals.next();
- count++;
+ BindingSetRow bsRow = BindingSetRow.makeFromShardedRow(prefixBytes, colScanner.getRow());
+ if (bsRow.getNodeId().equals(nodeId)) {
+ Iterator<ColumnValue> vals = colScanner.iterator();
+ while (vals.hasNext()) {
+ vals.next();
+ count++;
+ }
}
}
tx.commit();
@@ -405,7 +423,6 @@ public class BatchIT extends RyaExportITBase {
int expected = expectedCounts.get(i);
NodeType type = NodeType.fromNodeId(id).get();
int count = countResults(fluoClient, id, type.getResultColumn());
- log.trace("NodeId: " + id + " Count: " + count + " Expected: " + expected);
switch (type) {
case STATEMENT_PATTERN:
assertEquals(expected, count);
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeleteIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeleteIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeleteIT.java
index 27b8222..ef5ab34 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeleteIT.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeleteIT.java
@@ -76,17 +76,18 @@ public class CreateDeleteIT extends RyaExportITBase {
// Create the PCJ in Fluo and load the statements into Rya.
final String pcjId = loadData(sparql, statements);
- try(FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) {
+ try(FluoClient fluoClient = FluoFactory.newClient(getFluoConfiguration())) {
// Ensure the data was loaded.
final List<Bytes> rows = getFluoTableEntries(fluoClient);
- assertEquals(18, rows.size());
+ assertEquals(19, rows.size());
// Delete the PCJ from the Fluo application.
new DeleteFluoPcj(1).deletePcj(fluoClient, pcjId);
+ getMiniFluo().waitForObservers();
// Ensure all data related to the query has been removed.
final List<Bytes> empty_rows = getFluoTableEntries(fluoClient);
- assertEquals(0, empty_rows.size());
+ assertEquals(1, empty_rows.size());
}
}
@@ -108,20 +109,21 @@ public class CreateDeleteIT extends RyaExportITBase {
// Create the PCJ in Fluo and load the statements into Rya.
final String pcjId = loadData(sparql, statements);
- try(FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) {
+ try(FluoClient fluoClient = FluoFactory.newClient(getFluoConfiguration())) {
// Ensure the data was loaded.
final List<Bytes> rows = getFluoTableEntries(fluoClient);
- assertEquals(10, rows.size());
+ assertEquals(11, rows.size());
// Delete the PCJ from the Fluo application.
new DeleteFluoPcj(1).deletePcj(fluoClient, pcjId);
+ getMiniFluo().waitForObservers();
// Ensure all data related to the query has been removed.
final List<Bytes> empty_rows = getFluoTableEntries(fluoClient);
- assertEquals(0, empty_rows.size());
+ assertEquals(1, empty_rows.size());
}
}
-
+
private String loadData(final String sparql, final Collection<Statement> statements) throws Exception {
requireNonNull(sparql);
@@ -133,14 +135,14 @@ public class CreateDeleteIT extends RyaExportITBase {
final String pcjId = ryaClient.getCreatePCJ().createPCJ(getRyaInstanceName(), sparql, Sets.newHashSet());
// Write the data to Rya.
- final SailRepositoryConnection ryaConn = super.getRyaSailRepository().getConnection();
+ final SailRepositoryConnection ryaConn = getRyaSailRepository().getConnection();
ryaConn.begin();
ryaConn.add(statements);
ryaConn.commit();
ryaConn.close();
// Wait for the Fluo application to finish computing the end result.
- super.getMiniFluo().waitForObservers();
+ getMiniFluo().waitForObservers();
// The PCJ Id is the topic name the results will be written to.
return pcjId;
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeletePeriodicPCJ.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeletePeriodicPCJ.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeletePeriodicPCJ.java
index e61104a..eb21b34 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeletePeriodicPCJ.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeletePeriodicPCJ.java
@@ -109,11 +109,11 @@ public class CreateDeletePeriodicPCJ extends KafkaExportITBase {
vf.createLiteral(dtf.newXMLGregorianCalendar(time4))),
vf.createStatement(vf.createURI("urn:obs_4"), vf.createURI("uri:hasId"), vf.createLiteral("id_4")));
- runTest(query, statements, 29);
+ runTest(query, statements, 30);
}
-
+
private void runTest(String query, Collection<Statement> statements, int expectedEntries) throws Exception {
try (FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) {
@@ -134,10 +134,11 @@ public class CreateDeletePeriodicPCJ extends KafkaExportITBase {
DeletePeriodicQuery deletePeriodic = new DeletePeriodicQuery(fluoClient, storage);
deletePeriodic.deletePeriodicQuery(FluoQueryUtils.convertFluoQueryIdToPcjId(id), notificationClient);
+ getMiniFluo().waitForObservers();
// Ensure all data related to the query has been removed.
final List<Bytes> empty_rows = getFluoTableEntries(fluoClient);
- assertEquals(0, empty_rows.size());
+ assertEquals(1, empty_rows.size());
// Ensure that Periodic Service notified to add and delete PeriodicNotification
Set<CommandNotification> notifications;
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/FluoLatencyIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/FluoLatencyIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/FluoLatencyIT.java
deleted file mode 100644
index fabf512..0000000
--- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/FluoLatencyIT.java
+++ /dev/null
@@ -1,169 +0,0 @@
-package org.apache.rya.indexing.pcj.fluo.integration;
-
-import static java.util.Objects.requireNonNull;
-
-import java.time.ZonedDateTime;
-import java.time.format.DateTimeFormatter;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import javax.xml.datatype.DatatypeConfigurationException;
-import javax.xml.datatype.DatatypeFactory;
-
-import org.apache.accumulo.core.client.Scanner;
-import org.apache.accumulo.core.client.TableNotFoundException;
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.security.Authorizations;
-import org.apache.fluo.api.client.FluoClient;
-import org.apache.fluo.api.client.FluoFactory;
-import org.apache.fluo.api.client.Transaction;
-import org.apache.fluo.api.client.scanner.CellScanner;
-import org.apache.fluo.api.config.FluoConfiguration;
-import org.apache.fluo.api.data.RowColumnValue;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.rya.api.client.CreatePCJ.ExportStrategy;
-import org.apache.rya.indexing.pcj.fluo.api.CreateFluoPcj;
-import org.apache.rya.indexing.pcj.fluo.app.util.FluoQueryUtils;
-import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
-import org.apache.rya.pcj.fluo.test.base.KafkaExportITBase;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.openrdf.model.Literal;
-import org.openrdf.model.Statement;
-import org.openrdf.model.ValueFactory;
-import org.openrdf.model.impl.ValueFactoryImpl;
-import org.openrdf.repository.sail.SailRepositoryConnection;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-
-public class FluoLatencyIT extends KafkaExportITBase {
- private static ValueFactory vf;
- private static DatatypeFactory dtf;
-
- @BeforeClass
- public static void init() throws DatatypeConfigurationException {
- vf = new ValueFactoryImpl();
- dtf = DatatypeFactory.newInstance();
- }
-
- @Test
- public void resultsExported() throws Exception {
-
- final String sparql = "prefix time: <http://www.w3.org/2006/time#> " + "select ?type (count(?obs) as ?total) where { "
- + " ?obs <uri:hasTime> ?time. " + " ?obs <uri:hasObsType> ?type " + "} " + "group by ?type";
-
-// final String sparql = "prefix time: <http://www.w3.org/2006/time#> " + "select ?type ?obs where { "
-// + " ?obs <uri:hasTime> ?time. " + " ?obs <uri:hasObsType> ?type " + "}";
-
- try (FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) {
- // Tell the Fluo app to maintain the PCJ.
- String pcjId = FluoQueryUtils.createNewPcjId();
- FluoConfiguration conf = super.getFluoConfiguration();
- new CreateFluoPcj().createPcj(pcjId, sparql, Sets.newHashSet(ExportStrategy.KAFKA), fluoClient);
- SailRepositoryConnection conn = super.getRyaSailRepository().getConnection();
-
- long start = System.currentTimeMillis();
- int numReturned = 0;
- int numObs = 10;
- int numTypes = 5;
- int numExpected = 0;
- int increment = numObs*numTypes;
- while (System.currentTimeMillis() - start < 60000) {
- List<Statement> statements = generate(10, 5, "car_", numExpected, ZonedDateTime.now());
- conn.add(statements);
- numExpected += increment;
- System.out.println("Num Accumulo Entries: " + getNumAccEntries(conf.getAccumuloTable()) + " Num Fluo Entries: "
- + getNumFluoEntries(fluoClient));
- numReturned += readAllResults(pcjId).size();
- System.out
- .println("Expected: " + numExpected + " NumReturned: " + numReturned + " Difference: " + (numExpected - numReturned));
-// FluoITHelper.printFluoTable(conf);
- Thread.sleep(30000);
- }
- }
- }
-
- /**
- * Generates (numObservationsPerType x numTypes) statements of the form:
- *
- * <pre>
- * urn:obs_n uri:hasTime zonedTime
- * urn:obs_n uri:hasObsType typePrefix_m
- * </pre>
- *
- * Where the n in urn:obs_n is the ith value in 0 to (numObservationsPerType x numTypes) with an offset specified by
- * observationOffset, and where m in typePrefix_m is the jth value in 0 to numTypes.
- *
- * @param numObservationsPerType - The quantity of observations per type to generate.
- * @param numTypes - The number of types to generate observations for.
- * @param typePrefix - The prefix to be used for the type literal in the statement.
- * @param observationOffset - The offset to be used for determining the value of n in the above statements.
- * @param zonedTime - The time to be used for all observations generated.
- * @return A new list of all generated Statements.
- */
- public List<Statement> generate(final long numObservationsPerType, final int numTypes, final String typePrefix,
- final long observationOffset, final ZonedDateTime zonedTime) {
- final String time = zonedTime.format(DateTimeFormatter.ISO_INSTANT);
- final Literal litTime = vf.createLiteral(dtf.newXMLGregorianCalendar(time));
- final List<Statement> statements = Lists.newArrayList();
-
- for (long i = 0; i < numObservationsPerType; i++) {
- for (int j = 0; j < numTypes; j++) {
- final long observationId = observationOffset + i * numTypes + j;
- // final String obsId = "urn:obs_" + Long.toHexString(observationId) + "_" + observationId;
- // final String obsId = "urn:obs_" + observationId;
- final String obsId = "urn:obs_" + String.format("%020d", observationId);
- final String type = typePrefix + j;
- // logger.info(obsId + " " + type + " " + litTime);
- statements.add(vf.createStatement(vf.createURI(obsId), vf.createURI("uri:hasTime"), litTime));
- statements.add(vf.createStatement(vf.createURI(obsId), vf.createURI("uri:hasObsType"), vf.createLiteral(type)));
- }
- }
-
- return statements;
- }
-
- private Set<VisibilityBindingSet> readAllResults(final String pcjId) throws Exception {
- requireNonNull(pcjId);
-
- // Read all of the results from the Kafka topic.
- final Set<VisibilityBindingSet> results = new HashSet<>();
-
- try (final KafkaConsumer<String, VisibilityBindingSet> consumer = makeConsumer(pcjId)) {
- final ConsumerRecords<String, VisibilityBindingSet> records = consumer.poll(5000);
- final Iterator<ConsumerRecord<String, VisibilityBindingSet>> recordIterator = records.iterator();
- while (recordIterator.hasNext()) {
- results.add(recordIterator.next().value());
- }
- }
-
- return results;
- }
-
- private int getNumAccEntries(String tableName) throws TableNotFoundException {
- Scanner scanner = super.getAccumuloConnector().createScanner(tableName, new Authorizations());
- int count = 0;
- for (Map.Entry<Key, Value> entry : scanner) {
- count++;
- }
- return count;
- }
-
- private int getNumFluoEntries(FluoClient client) {
- Transaction tx = client.newTransaction();
- CellScanner scanner = tx.scanner().build();
- int count = 0;
- for (RowColumnValue rcv : scanner) {
- count++;
- }
- return count;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/QueryIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/QueryIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/QueryIT.java
index beaef32..2b87a97 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/QueryIT.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/QueryIT.java
@@ -123,9 +123,8 @@ public class QueryIT extends RyaExportITBase {
// and are skilled with computers. The resulting binding set includes everybody who
// was involved in the recruitment process.
final String sparql = "SELECT ?recruiter ?candidate ?leader " + "{ " + "?recruiter <http://recruiterFor> <http://GeekSquad>. "
- + "?candidate <http://skilledWith> <http://Computers>. " + "?candidate <http://livesIn> \"USA\". "
- + "?leader <http://leaderOf> <http://GeekSquad>. " + "?recruiter <http://talksTo> ?candidate. "
- + "?candidate <http://talksTo> ?leader. " + "}";
+ + "?recruiter <http://talksTo> ?candidate. " + "?candidate <http://skilledWith> <http://Computers>. " + "?candidate <http://livesIn> \"USA\". "
+ + "?candidate <http://talksTo> ?leader." + "?leader <http://leaderOf> <http://GeekSquad>. }";
// Create the Statements that will be loaded into Rya.
final ValueFactory vf = new ValueFactoryImpl();
@@ -426,10 +425,10 @@ public class QueryIT extends RyaExportITBase {
runTest(sparql, statements, expectedResults, ExportStrategy.RYA);
}
-
+
@Test
public void dateTimeWithin() throws Exception {
-
+
final ValueFactory vf = new ValueFactoryImpl();
DatatypeFactory dtf = DatatypeFactory.newInstance();
FunctionRegistry.getInstance().add(new DateTimeWithinPeriod());
@@ -437,13 +436,13 @@ public class QueryIT extends RyaExportITBase {
final String sparql = "PREFIX fn: <" + FN.NAMESPACE +">"
+ "SELECT ?event ?startTime ?endTime WHERE { ?event <uri:startTime> ?startTime; <uri:endTime> ?endTime. "
+ "FILTER(fn:dateTimeWithin(?startTime, ?endTime, 2,<" + OWLTime.HOURS_URI + "> ))}";
-
+
ZonedDateTime zTime = ZonedDateTime.now();
String time = zTime.format(DateTimeFormatter.ISO_INSTANT);
ZonedDateTime zTime1 = zTime.minusHours(1);
String time1 = zTime1.format(DateTimeFormatter.ISO_INSTANT);
-
+
ZonedDateTime zTime2 = zTime.minusHours(2);
String time2 = zTime2.format(DateTimeFormatter.ISO_INSTANT);
@@ -471,10 +470,10 @@ public class QueryIT extends RyaExportITBase {
// Verify the end results of the query match the expected results.
runTest(sparql, statements, expectedResults, ExportStrategy.RYA);
}
-
+
@Test
public void dateTimeWithinNow() throws Exception {
-
+
final ValueFactory vf = new ValueFactoryImpl();
DatatypeFactory dtf = DatatypeFactory.newInstance();
FunctionRegistry.getInstance().add(new DateTimeWithinPeriod());
@@ -482,13 +481,13 @@ public class QueryIT extends RyaExportITBase {
final String sparql = "PREFIX fn: <" + FN.NAMESPACE +">"
+ "SELECT ?event ?startTime WHERE { ?event <uri:startTime> ?startTime. "
+ "FILTER(fn:dateTimeWithin(?startTime, NOW(), 15, <" + OWLTime.SECONDS_URI + "> ))}";
-
+
ZonedDateTime zTime = ZonedDateTime.now();
String time = zTime.format(DateTimeFormatter.ISO_INSTANT);
ZonedDateTime zTime1 = zTime.minusSeconds(30);
String time1 = zTime1.format(DateTimeFormatter.ISO_INSTANT);
-
+
Literal lit = vf.createLiteral(dtf.newXMLGregorianCalendar(time));
Literal lit1 = vf.createLiteral(dtf.newXMLGregorianCalendar(time1));
@@ -511,7 +510,7 @@ public class QueryIT extends RyaExportITBase {
}
-
+
@Test
public void periodicQueryTestWithoutAggregation() throws Exception {
String query = "prefix function: <http://org.apache.rya/function#> " // n
@@ -800,8 +799,8 @@ public class QueryIT extends RyaExportITBase {
// Verify the end results of the query match the expected results.
runTest(query, statements, expectedResults, ExportStrategy.PERIODIC);
}
-
-
+
+
@Test
public void nestedPeriodicQueryTestWithAggregationAndGroupBy() throws Exception {
String query = "prefix function: <http://org.apache.rya/function#> " // n
@@ -879,7 +878,7 @@ public class QueryIT extends RyaExportITBase {
// Verify the end results of the query match the expected results.
runTest(query, statements, expectedResults, ExportStrategy.PERIODIC);
}
-
+
@Test
public void nestedJoinPeriodicQueryWithAggregationAndGroupBy() throws Exception {
String query = "prefix function: <http://org.apache.rya/function#> " // n
@@ -1006,6 +1005,7 @@ public class QueryIT extends RyaExportITBase {
// Ensure the result of the query matches the expected result.
assertEquals(expectedResults, results);
}
+
break;
case PERIODIC:
PeriodicQueryResultStorage periodicStorage = new AccumuloPeriodicQueryResultStorage(accumuloConn, getRyaInstanceName());
@@ -1014,7 +1014,7 @@ public class QueryIT extends RyaExportITBase {
new CreateFluoPcj().createPcj(periodicId, sparql, Sets.newHashSet(ExportStrategy.PERIODIC), fluo);
}
addStatementsAndWait(statements);
-
+
final Set<BindingSet> results = Sets.newHashSet();
try (CloseableIterator<BindingSet> resultIter = periodicStorage.listResults(periodicId, Optional.empty())) {
while (resultIter.hasNext()) {
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/StatementPatternIdCacheIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/StatementPatternIdCacheIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/StatementPatternIdCacheIT.java
new file mode 100644
index 0000000..91c9977
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/StatementPatternIdCacheIT.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.integration;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.fluo.api.client.FluoClient;
+import org.apache.fluo.api.client.FluoFactory;
+import org.apache.rya.indexing.pcj.fluo.api.CreateFluoPcj;
+import org.apache.rya.indexing.pcj.fluo.app.query.FluoQuery;
+import org.apache.rya.indexing.pcj.fluo.app.query.StatementPatternIdCache;
+import org.apache.rya.indexing.pcj.fluo.app.query.StatementPatternMetadata;
+import org.apache.rya.indexing.pcj.fluo.app.util.FluoQueryUtils;
+import org.apache.rya.pcj.fluo.test.base.RyaExportITBase;
+import org.junit.Test;
+
+import com.google.common.collect.Sets;
+
+public class StatementPatternIdCacheIT extends RyaExportITBase {
+
+ /**
+ * Ensure streamed matches are included in the result.
+ */
+ @Test
+ public void statementPatternIdCacheTest() throws Exception {
+ // A query that finds people who talk to Eve and work at Chipotle.
+ final String sparql1 =
+ "SELECT ?x WHERE { " +
+ "?x <urn:pred1> <urn:obj1>. " +
+ "?x <urn:pred2> <urn:obj2>." +
+ "}";
+
+ final String sparql2 =
+ "SELECT ?x WHERE { " +
+ "?x <urn:pred3> <urn:obj3>. " +
+ "?x <urn:pred4> <urn:obj4>." +
+ "}";
+
+ try(FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) {
+
+ String pcjId = FluoQueryUtils.createNewPcjId();
+ // Tell the Fluo app to maintain the PCJ.
+ FluoQuery query1 = new CreateFluoPcj().createPcj(pcjId, sparql1, new HashSet<>(), fluoClient);
+ Set<String> spIds1 = new HashSet<>();
+ for(StatementPatternMetadata metadata: query1.getStatementPatternMetadata()) {
+ spIds1.add(metadata.getNodeId());
+ }
+
+ StatementPatternIdCache cache = new StatementPatternIdCache();
+
+ assertEquals(spIds1, cache.getStatementPatternIds(fluoClient.newTransaction()));
+
+ FluoQuery query2 = new CreateFluoPcj().createPcj(pcjId, sparql2, new HashSet<>(), fluoClient);
+ Set<String> spIds2 = new HashSet<>();
+ for(StatementPatternMetadata metadata: query2.getStatementPatternMetadata()) {
+ spIds2.add(metadata.getNodeId());
+ }
+
+ assertEquals(Sets.union(spIds1, spIds2), cache.getStatementPatternIds(fluoClient.newTransaction()));
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/resources/log4j.properties b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/resources/log4j.properties
new file mode 100644
index 0000000..a5086ee
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/resources/log4j.properties
@@ -0,0 +1,43 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# Valid levels:
+# TRACE, DEBUG, INFO, WARN, ERROR and FATAL
+log4j.rootLogger=INFO, CONSOLE
+
+# Set independent logging levels
+log4j.logger.org.apache.zookeeper=WARN
+log4j.logger.kafka=WARN
+log4j.logger.org.apache.kafka=WARN
+#log4j.logger.org.apache.rya.indexing.pcj.fluo=DEBUG
+
+# LOGFILE is set to be a File appender using a PatternLayout.
+log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
+#log4j.appender.CONSOLE.Threshold=DEBUG
+
+log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
+log4j.appender.CONSOLE.layout.ConversionPattern=%d [%t] %-5p %c - %m%n
+
+#log4j.appender.default.file=org.apache.log4j.FileAppender
+#log4j.appender.default.file.file=/home/cloudera/Desktop/log.out
+#log4j.appender.default.file.layout=org.apache.log4j.PatternLayout
+#log4j.appender.default.file.layout.ConversionPattern=%d [%t] %-5p %c - %m%n
+
+#log4j.appender.CONSOLE.layout=org.apache.log4j.EnhancedPatternLayout
+#log4j.appender.CONSOLE.layout.ConversionPattern=%d [%t] %-5p %c{1.} - %m%n
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/pcj/fluo/test/base/FluoITBase.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/pcj/fluo/test/base/FluoITBase.java b/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/pcj/fluo/test/base/FluoITBase.java
index 32ee962..48334d0 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/pcj/fluo/test/base/FluoITBase.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/pcj/fluo/test/base/FluoITBase.java
@@ -48,41 +48,42 @@ import org.apache.accumulo.core.client.Instance;
import org.apache.accumulo.core.client.ZooKeeperInstance;
import org.apache.accumulo.core.client.security.tokens.PasswordToken;
import org.apache.accumulo.minicluster.MiniAccumuloCluster;
-import org.apache.log4j.Level;
-import org.apache.log4j.Logger;
-import org.apache.rya.accumulo.MiniAccumuloClusterInstance;
-import org.apache.rya.accumulo.MiniAccumuloSingleton;
-import org.apache.rya.accumulo.RyaTestInstanceRule;
-import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails;
-import org.apache.rya.api.client.accumulo.AccumuloInstall;
-import org.apache.zookeeper.ClientCnxn;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.openrdf.repository.RepositoryConnection;
-import org.openrdf.repository.RepositoryException;
-import org.openrdf.sail.Sail;
-import org.openrdf.sail.SailException;
-
import org.apache.fluo.api.client.FluoAdmin;
import org.apache.fluo.api.client.FluoAdmin.AlreadyInitializedException;
import org.apache.fluo.api.client.FluoClient;
import org.apache.fluo.api.client.FluoFactory;
import org.apache.fluo.api.config.FluoConfiguration;
import org.apache.fluo.api.mini.MiniFluo;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
import org.apache.rya.accumulo.AccumuloRdfConfiguration;
-import org.apache.rya.api.client.RyaClientException;
+import org.apache.rya.accumulo.MiniAccumuloClusterInstance;
+import org.apache.rya.accumulo.MiniAccumuloSingleton;
+import org.apache.rya.accumulo.RyaTestInstanceRule;
import org.apache.rya.api.client.Install;
import org.apache.rya.api.client.Install.DuplicateInstanceNameException;
import org.apache.rya.api.client.Install.InstallConfiguration;
+import org.apache.rya.api.client.RyaClientException;
+import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails;
+import org.apache.rya.api.client.accumulo.AccumuloInstall;
import org.apache.rya.api.instance.RyaDetailsRepository.RyaDetailsRepositoryException;
import org.apache.rya.api.persist.RyaDAOException;
import org.apache.rya.indexing.accumulo.ConfigUtils;
import org.apache.rya.indexing.external.PrecomputedJoinIndexerConfig;
+import org.apache.rya.indexing.pcj.fluo.app.query.MetadataCacheSupplier;
+import org.apache.rya.indexing.pcj.fluo.app.query.StatementPatternIdCacheSupplier;
import org.apache.rya.rdftriplestore.RyaSailRepository;
import org.apache.rya.rdftriplestore.inference.InferenceEngineException;
import org.apache.rya.sail.config.RyaSailFactory;
+import org.apache.zookeeper.ClientCnxn;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.openrdf.repository.RepositoryConnection;
+import org.openrdf.repository.RepositoryException;
+import org.openrdf.sail.Sail;
+import org.openrdf.sail.SailException;
/**
* Integration tests that ensure the Fluo application processes PCJs results
@@ -156,6 +157,7 @@ public abstract class FluoITBase {
} catch (final Exception e) {
log.error("Could not shut down the Rya Connection.", e);
}
+
}
if (ryaRepo != null) {
@@ -187,6 +189,9 @@ public abstract class FluoITBase {
log.error("Could not shut down the Mini Fluo.", e);
}
}
+
+ StatementPatternIdCacheSupplier.clear();
+ MetadataCacheSupplier.clear();
}
protected void preFluoInitHook() throws Exception {
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/pcj/fluo/test/base/KafkaExportITBase.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/pcj/fluo/test/base/KafkaExportITBase.java b/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/pcj/fluo/test/base/KafkaExportITBase.java
index 7b16dcf..ca17b2a 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/pcj/fluo/test/base/KafkaExportITBase.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/pcj/fluo/test/base/KafkaExportITBase.java
@@ -53,6 +53,7 @@ import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails;
import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory;
import org.apache.rya.indexing.accumulo.ConfigUtils;
import org.apache.rya.indexing.external.PrecomputedJoinIndexerConfig;
+import org.apache.rya.indexing.pcj.fluo.app.batch.BatchObserver;
import org.apache.rya.indexing.pcj.fluo.app.export.kafka.KafkaBindingSetExporterParameters;
import org.apache.rya.indexing.pcj.fluo.app.export.kafka.KafkaSubGraphExporterParameters;
import org.apache.rya.indexing.pcj.fluo.app.export.kafka.KryoVisibilityBindingSetSerializer;
@@ -64,11 +65,14 @@ import org.apache.rya.indexing.pcj.fluo.app.observers.ProjectionObserver;
import org.apache.rya.indexing.pcj.fluo.app.observers.QueryResultObserver;
import org.apache.rya.indexing.pcj.fluo.app.observers.StatementPatternObserver;
import org.apache.rya.indexing.pcj.fluo.app.observers.TripleObserver;
+import org.apache.rya.indexing.pcj.fluo.app.query.MetadataCacheSupplier;
+import org.apache.rya.indexing.pcj.fluo.app.query.StatementPatternIdCacheSupplier;
import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
import org.apache.rya.rdftriplestore.RyaSailRepository;
import org.apache.rya.sail.config.RyaSailFactory;
import org.junit.After;
import org.junit.Before;
+import org.junit.Test;
import org.openrdf.model.Statement;
import org.openrdf.repository.sail.SailRepositoryConnection;
import org.openrdf.sail.Sail;
@@ -115,6 +119,7 @@ public class KafkaExportITBase extends AccumuloExportITBase {
// Setup the observers that will be used by the Fluo PCJ Application.
final List<ObserverSpecification> observers = new ArrayList<>();
observers.add(new ObserverSpecification(TripleObserver.class.getName()));
+ observers.add(new ObserverSpecification(BatchObserver.class.getName()));
observers.add(new ObserverSpecification(StatementPatternObserver.class.getName()));
observers.add(new ObserverSpecification(JoinObserver.class.getName()));
observers.add(new ObserverSpecification(FilterObserver.class.getName()));
@@ -185,6 +190,12 @@ public class KafkaExportITBase extends AccumuloExportITBase {
}
}
+ @After
+ public void clearCaches() {
+ StatementPatternIdCacheSupplier.clear();
+ MetadataCacheSupplier.clear();
+ }
+
private void installRyaInstance() throws Exception {
final MiniAccumuloCluster cluster = super.getMiniAccumuloCluster();
final String instanceName = cluster.getInstanceName();
@@ -261,7 +272,7 @@ public class KafkaExportITBase extends AccumuloExportITBase {
* If this test fails then its a testing environment issue, not with Rya.
* Source: https://github.com/asmaier/mini-kafka
*/
-// @Test
+ @Test
public void embeddedKafkaTest() throws Exception {
// create topic
final String topic = "testTopic";
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/rya.pcj.functions.geo/src/test/java/org/apache/rya/indexing/pcj/functions/geo/GeoFunctionsIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/rya.pcj.functions.geo/src/test/java/org/apache/rya/indexing/pcj/functions/geo/GeoFunctionsIT.java b/extras/rya.pcj.fluo/rya.pcj.functions.geo/src/test/java/org/apache/rya/indexing/pcj/functions/geo/GeoFunctionsIT.java
index 970cfd8..f540a2e 100644
--- a/extras/rya.pcj.fluo/rya.pcj.functions.geo/src/test/java/org/apache/rya/indexing/pcj/functions/geo/GeoFunctionsIT.java
+++ b/extras/rya.pcj.fluo/rya.pcj.functions.geo/src/test/java/org/apache/rya/indexing/pcj/functions/geo/GeoFunctionsIT.java
@@ -132,6 +132,8 @@ public class GeoFunctionsIT extends RyaExportITBase {
"PREFIX uom: <http://www.opengis.net/def/uom/OGC/1.0/> " +
"SELECT ?cityA ?cityB " +
"WHERE { " +
+ "?cityA <urn:containedIn> ?continent. " +
+ "?cityB <urn:containedIn> ?continent. " +
"?cityA geo:asWKT ?coord1 . " +
"?cityB geo:asWKT ?coord2 . " +
// from brussels 173km to amsterdam
@@ -147,9 +149,13 @@ public class GeoFunctionsIT extends RyaExportITBase {
vf.createStatement(vf.createURI("tag:rya.apache.org,2017:ex#dakar2"), asWKT, vf.createLiteral("Point(-17.45 14.69)", wktTypeUri)),
vf.createStatement(vf.createURI("tag:rya.apache.org,2017:ex#canberra"), asWKT, vf.createLiteral("Point(149.12 -35.31)", wktTypeUri)),
vf.createStatement(vf.createURI("tag:rya.apache.org,2017:ex#brussels"), asWKT, vf.createLiteral("Point(4.35 50.85)", wktTypeUri)),
- vf.createStatement(vf.createURI("tag:rya.apache.org,2017:ex#amsterdam"), asWKT, vf.createLiteral("Point(4.9 52.37)", wktTypeUri)));
+ vf.createStatement(vf.createURI("tag:rya.apache.org,2017:ex#amsterdam"), asWKT, vf.createLiteral("Point(4.9 52.37)", wktTypeUri)),
+ vf.createStatement(vf.createURI("tag:rya.apache.org,2017:ex#amsterdam"), vf.createURI("urn:containedIn"), vf.createLiteral("Europe")),
+ vf.createStatement(vf.createURI("tag:rya.apache.org,2017:ex#dakar"), vf.createURI("urn:containedIn"), vf.createLiteral("Africa")),
+ vf.createStatement(vf.createURI("tag:rya.apache.org,2017:ex#dakar2"), vf.createURI("urn:containedIn"), vf.createLiteral("Africa")),
+ vf.createStatement(vf.createURI("tag:rya.apache.org,2017:ex#brussels"), vf.createURI("urn:containedIn"), vf.createLiteral("Europe")));
- // The expected results of the SPARQL query once the PCJ has been computed.
+ // The expected results of the SPARQL query once the PCJ has been computed.l
final Set<BindingSet> expectedResults = new HashSet<>();
MapBindingSet bs = new MapBindingSet();
@@ -234,6 +240,8 @@ public class GeoFunctionsIT extends RyaExportITBase {
"PREFIX geof: <http://www.opengis.net/def/function/geosparql/> " +
"PREFIX uom: <http://www.opengis.net/def/uom/OGC/1.0/> " +
"SELECT ?cityA ?cityB { " +
+ "?cityA <urn:containedIn> ?continent. " +
+ "?cityB <urn:containedIn> ?continent. " +
"?cityA geo:asWKT ?coord1 . " +
"?cityB geo:asWKT ?coord2 . " +
" FILTER ( geof:sfIntersects(?coord1, ?coord2) ) " +
@@ -248,6 +256,8 @@ public class GeoFunctionsIT extends RyaExportITBase {
vf.createStatement(vf.createURI("tag:rya.apache.org,2017:ex#canberra"), asWKT, vf.createLiteral("Point(149.12 -35.31)", wktTypeUri)),
vf.createStatement(vf.createURI("tag:rya.apache.org,2017:ex#brussels"), asWKT, vf.createLiteral("Point(4.35 50.85)", wktTypeUri)),
vf.createStatement(vf.createURI("tag:rya.apache.org,2017:ex#amsterdam"), asWKT, vf.createLiteral("Point(4.9 52.37)", wktTypeUri)),
+ vf.createStatement(vf.createURI("tag:rya.apache.org,2017:ex#amsterdam"), vf.createURI("urn:containedIn"), vf.createLiteral("Europe")),
+ vf.createStatement(vf.createURI("tag:rya.apache.org,2017:ex#amsterdam2"), vf.createURI("urn:containedIn"), vf.createLiteral("Europe")),
vf.createStatement(vf.createURI("tag:rya.apache.org,2017:ex#amsterdam2"), asWKT, vf.createLiteral("Point(4.9 52.37)", wktTypeUri)));
// The expected results of the SPARQL query once the PCJ has been computed.