You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rya.apache.org by ca...@apache.org on 2017/04/24 15:06:27 UTC
[9/9] incubator-rya git commit: RYA-260 Fluo PCJ application has had
Aggregation support added to it. Also fixed a bunch of resource leaks that
were causing integration tests to fail. Closes #156.
RYA-260 Fluo PCJ application has had Aggregation support added to it. Also fixed a bunch of resource leaks that were causing integration tests to fail. Closes #156.
Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/c941aea8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/c941aea8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/c941aea8
Branch: refs/heads/master
Commit: c941aea8b65acb99b451757c48279914d9488c85
Parents: be9ea9a
Author: Kevin Chilton <ke...@parsons.com>
Authored: Fri Apr 7 15:57:57 2017 -0400
Committer: Caleb Meier <ca...@parsons.com>
Committed: Mon Apr 24 07:58:25 2017 -0700
----------------------------------------------------------------------
.../AccumuloRyaInstanceDetailsRepository.java | 20 +-
.../accumulo/utils/VisibilitySimplifier.java | 30 +-
.../utils/VisibilitySimplifierTest.java | 24 +
.../rya/accumulo/utils/RyaTableNames.java | 11 +-
.../client/accumulo/AccumuloBatchUpdatePCJ.java | 87 ++-
.../api/client/accumulo/AccumuloCreatePCJ.java | 86 ++-
.../api/client/accumulo/AccumuloDeletePCJ.java | 38 +-
.../pcj/matching/AccumuloIndexSetProvider.java | 62 +-
.../accumulo/AccumuloBatchUpdatePCJIT.java | 10 +-
.../client/accumulo/AccumuloCreatePCJIT.java | 72 +-
.../client/accumulo/AccumuloDeletePCJIT.java | 46 +-
.../benchmark/query/QueryBenchmarkRunIT.java | 29 +-
.../pcj/storage/PrecomputedJoinStorage.java | 22 +-
.../storage/accumulo/AccumuloPcjSerializer.java | 20 -
.../storage/accumulo/AccumuloPcjStorage.java | 8 +-
.../storage/accumulo/BindingSetConverter.java | 9 +-
.../accumulo/BindingSetStringConverter.java | 53 +-
.../pcj/storage/accumulo/PcjTables.java | 39 +-
.../accumulo/ScannerBindingSetIterator.java | 18 +-
.../pcj/storage/accumulo/VariableOrder.java | 15 +-
.../pcj/update/PrecomputedJoinUpdater.java | 10 +-
.../accumulo/AccumuloPcjSerializerTest.java | 185 +++++
.../accumulo/AccumuloPcjSerialzerTest.java | 175 -----
.../accumulo/BindingSetStringConverterTest.java | 42 +-
.../accumulo/PcjTablesIntegrationTest.java | 26 +-
.../accumulo/accumulo/AccumuloPcjStorageIT.java | 284 ++++----
.../rya/indexing/pcj/fluo/api/CreatePcj.java | 277 ++++----
.../rya/indexing/pcj/fluo/api/DeletePcj.java | 28 +-
extras/rya.pcj.fluo/pcj.fluo.app/pom.xml | 10 +-
.../pcj/fluo/app/AggregationResultUpdater.java | 572 +++++++++++++++
.../indexing/pcj/fluo/app/BindingSetRow.java | 11 +-
.../pcj/fluo/app/FilterResultUpdater.java | 57 +-
.../rya/indexing/pcj/fluo/app/IncUpdateDAO.java | 95 +--
.../fluo/app/IncrementalUpdateConstants.java | 1 +
.../pcj/fluo/app/JoinResultUpdater.java | 131 ++--
.../rya/indexing/pcj/fluo/app/NodeType.java | 11 +-
.../pcj/fluo/app/QueryResultUpdater.java | 69 +-
.../pcj/fluo/app/VisibilityBindingSetSerDe.java | 77 +++
.../app/export/IncrementalResultExporter.java | 12 +-
.../app/export/kafka/KafkaResultExporter.java | 32 +-
.../fluo/app/export/rya/RyaResultExporter.java | 16 +-
.../fluo/app/observers/AggregationObserver.java | 74 ++
.../fluo/app/observers/BindingSetUpdater.java | 53 +-
.../pcj/fluo/app/observers/FilterObserver.java | 25 +-
.../pcj/fluo/app/observers/JoinObserver.java | 24 +-
.../fluo/app/observers/QueryResultObserver.java | 54 +-
.../app/observers/StatementPatternObserver.java | 25 +-
.../pcj/fluo/app/observers/TripleObserver.java | 158 +++--
.../pcj/fluo/app/query/AggregationMetadata.java | 371 ++++++++++
.../indexing/pcj/fluo/app/query/FluoQuery.java | 111 ++-
.../pcj/fluo/app/query/FluoQueryColumns.java | 60 +-
.../fluo/app/query/FluoQueryMetadataDAO.java | 186 ++++-
.../fluo/app/query/SparqlFluoQueryBuilder.java | 116 +++-
.../pcj/fluo/app/util/BindingSetUtil.java | 54 ++
.../indexing/pcj/fluo/app/util/RowKeyUtil.java | 69 ++
.../fluo/app/VisibilityBindingSetSerDeTest.java | 51 ++
.../fluo/client/command/NewQueryCommand.java | 2 +-
.../pcj/fluo/demo/FluoAndHistoricPcjsDemo.java | 38 +-
.../rya.pcj.fluo/pcj.fluo.integration/pom.xml | 13 +-
.../apache/rya/indexing/pcj/fluo/ITBase.java | 443 ------------
.../indexing/pcj/fluo/KafkaExportITBase.java | 315 +++++++++
.../rya/indexing/pcj/fluo/RyaExportITBase.java | 182 +++++
.../pcj/fluo/api/CountStatementsIT.java | 54 +-
.../indexing/pcj/fluo/api/GetPcjMetadataIT.java | 91 +--
.../indexing/pcj/fluo/api/GetQueryReportIT.java | 107 +--
.../indexing/pcj/fluo/api/ListQueryIdsIT.java | 35 +-
.../fluo/app/query/FluoQueryMetadataDAOIT.java | 204 ++++--
.../pcj/fluo/integration/CreateDeleteIT.java | 166 +++--
.../indexing/pcj/fluo/integration/InputIT.java | 275 +++++---
.../pcj/fluo/integration/KafkaExportIT.java | 693 ++++++++++++-------
.../indexing/pcj/fluo/integration/QueryIT.java | 580 ++++++++--------
.../pcj/fluo/integration/RyaExportIT.java | 101 +--
.../RyaInputIncrementalUpdateIT.java | 245 ++++---
.../pcj/fluo/integration/StreamingTestIT.java | 140 ++--
.../HistoricStreamingVisibilityIT.java | 80 ++-
.../pcj/fluo/visibility/PcjVisibilityIT.java | 199 +++---
.../rya.pcj.fluo/rya.pcj.functions.geo/pom.xml | 44 +-
.../rya/indexing/pcj/fluo/RyaExportITBase.java | 182 +++++
.../pcj/functions/geo/GeoFunctionsIT.java | 471 ++++++-------
pom.xml | 16 +
80 files changed, 5719 insertions(+), 3208 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/instance/AccumuloRyaInstanceDetailsRepository.java
----------------------------------------------------------------------
diff --git a/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/instance/AccumuloRyaInstanceDetailsRepository.java b/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/instance/AccumuloRyaInstanceDetailsRepository.java
index be8e12c..dcd64de 100644
--- a/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/instance/AccumuloRyaInstanceDetailsRepository.java
+++ b/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/instance/AccumuloRyaInstanceDetailsRepository.java
@@ -22,9 +22,6 @@ import static java.util.Objects.requireNonNull;
import java.util.Map.Entry;
-import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
-import edu.umd.cs.findbugs.annotations.NonNull;
-
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.BatchWriter;
@@ -48,6 +45,9 @@ import org.apache.hadoop.io.Text;
import org.apache.rya.api.instance.RyaDetails;
import org.apache.rya.api.instance.RyaDetailsRepository;
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
/**
* An implementation of {@link RyaDetailsRepository} that stores a Rya
* instance's {@link RyaDetails} in an Accumulo table.
@@ -89,12 +89,17 @@ public class AccumuloRyaInstanceDetailsRepository implements RyaDetailsRepositor
@Override
public boolean isInitialized() throws RyaDetailsRepositoryException {
+ Scanner scanner = null;
try {
- final Scanner scanner = connector.createScanner(detailsTableName, new Authorizations());
+ scanner = connector.createScanner(detailsTableName, new Authorizations());
scanner.fetchColumn(COL_FAMILY, COL_QUALIFIER);
return scanner.iterator().hasNext();
} catch (final TableNotFoundException e) {
return false;
+ } finally {
+ if(scanner != null) {
+ scanner.close();
+ }
}
}
@@ -157,9 +162,10 @@ public class AccumuloRyaInstanceDetailsRepository implements RyaDetailsRepositor
}
// Read it from the table.
+ Scanner scanner = null;
try {
// Fetch the value from the table.
- final Scanner scanner = connector.createScanner(detailsTableName, new Authorizations());
+ scanner = connector.createScanner(detailsTableName, new Authorizations());
scanner.fetchColumn(COL_FAMILY, COL_QUALIFIER);
final Entry<Key, Value> entry = scanner.iterator().next();
@@ -169,6 +175,10 @@ public class AccumuloRyaInstanceDetailsRepository implements RyaDetailsRepositor
} catch (final TableNotFoundException e) {
throw new RyaDetailsRepositoryException("Could not get the details from the table.", e);
+ } finally {
+ if(scanner != null) {
+ scanner.close();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/utils/VisibilitySimplifier.java
----------------------------------------------------------------------
diff --git a/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/utils/VisibilitySimplifier.java b/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/utils/VisibilitySimplifier.java
index 98c6abd..8fa3b0e 100644
--- a/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/utils/VisibilitySimplifier.java
+++ b/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/utils/VisibilitySimplifier.java
@@ -20,13 +20,13 @@ package org.apache.rya.accumulo.utils;
import static java.util.Objects.requireNonNull;
-import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
-import edu.umd.cs.findbugs.annotations.NonNull;
-
import org.apache.accumulo.core.security.ColumnVisibility;
import com.google.common.base.Charsets;
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
/**
* Simplifies Accumulo visibility expressions.
*/
@@ -34,12 +34,34 @@ import com.google.common.base.Charsets;
public class VisibilitySimplifier {
/**
+ * Unions two visibility equations and then simplifies the result.
+ *
+ * @param vis1 - The first visibility equation that will be unioned. (not null)
+ * @param vis2 - The other visibility equation that will be unioned. (not null)
+ * @return A simplified form of the unioned visibility equations.
+ */
+ public static String unionAndSimplify(final String vis1, final String vis2) {
+ requireNonNull(vis1);
+ requireNonNull(vis2);
+
+ if(vis1.isEmpty()) {
+ return vis2;
+ }
+
+ if(vis2.isEmpty()) {
+ return vis1;
+ }
+
+ return simplify("(" + vis1 + ")&(" + vis2 + ")");
+ }
+
+ /**
* Simplifies an Accumulo visibility expression.
*
* @param visibility - The expression to simplify. (not null)
* @return A simplified form of {@code visibility}.
*/
- public String simplify(final String visibility) {
+ public static String simplify(final String visibility) {
requireNonNull(visibility);
String last = visibility;
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/dao/accumulo.rya/src/test/java/org/apache/rya/accumulo/utils/VisibilitySimplifierTest.java
----------------------------------------------------------------------
diff --git a/dao/accumulo.rya/src/test/java/org/apache/rya/accumulo/utils/VisibilitySimplifierTest.java b/dao/accumulo.rya/src/test/java/org/apache/rya/accumulo/utils/VisibilitySimplifierTest.java
index a9a03ce..0adb325 100644
--- a/dao/accumulo.rya/src/test/java/org/apache/rya/accumulo/utils/VisibilitySimplifierTest.java
+++ b/dao/accumulo.rya/src/test/java/org/apache/rya/accumulo/utils/VisibilitySimplifierTest.java
@@ -50,4 +50,28 @@ public class VisibilitySimplifierTest {
final String simplified = new VisibilitySimplifier().simplify("(a|b)|(a|b)|a|b");
assertEquals("a|b", simplified);
}
+
+ @Test
+ public void unionAndSimplify() {
+ final String simplified = new VisibilitySimplifier().unionAndSimplify("u&b", "u");
+ assertEquals("b&u", simplified);
+ }
+
+ @Test
+ public void unionAndSimplify_firstIsEmpty() {
+ final String simplified = new VisibilitySimplifier().unionAndSimplify("", "u");
+ assertEquals("u", simplified);
+ }
+
+ @Test
+ public void unionAndSimplify_secondIsEmpty() {
+ final String simplified = new VisibilitySimplifier().unionAndSimplify("u", "");
+ assertEquals("u", simplified);
+ }
+
+ @Test
+ public void unionAndSimplify_bothAreEmpty() {
+ final String simplified = new VisibilitySimplifier().unionAndSimplify("", "");
+ assertEquals("", simplified);
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/indexing/src/main/java/org/apache/rya/accumulo/utils/RyaTableNames.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/org/apache/rya/accumulo/utils/RyaTableNames.java b/extras/indexing/src/main/java/org/apache/rya/accumulo/utils/RyaTableNames.java
index faeebbb..cd17cbc 100644
--- a/extras/indexing/src/main/java/org/apache/rya/accumulo/utils/RyaTableNames.java
+++ b/extras/indexing/src/main/java/org/apache/rya/accumulo/utils/RyaTableNames.java
@@ -34,6 +34,7 @@ import org.apache.rya.api.layout.TablePrefixLayoutStrategy;
import org.apache.rya.indexing.accumulo.entity.EntityCentricIndex;
import org.apache.rya.indexing.accumulo.freetext.AccumuloFreeTextIndexer;
import org.apache.rya.indexing.accumulo.temporal.AccumuloTemporalIndexer;
+import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.PCJStorageException;
import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage;
import org.apache.rya.indexing.pcj.storage.accumulo.PcjTableNameFactory;
@@ -93,11 +94,13 @@ public class RyaTableNames {
*/
if(details.getPCJIndexDetails().isEnabled()) {
- final List<String> pcjIds = new AccumuloPcjStorage(conn, ryaInstanceName).listPcjs();
+ try(final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(conn, ryaInstanceName)) {
+ final List<String> pcjIds = pcjStorage.listPcjs();
- final PcjTableNameFactory tableNameFactory = new PcjTableNameFactory();
- for(final String pcjId : pcjIds) {
- tables.add( tableNameFactory.makeTableName(ryaInstanceName, pcjId) );
+ final PcjTableNameFactory tableNameFactory = new PcjTableNameFactory();
+ for(final String pcjId : pcjIds) {
+ tables.add( tableNameFactory.makeTableName(ryaInstanceName, pcjId) );
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloBatchUpdatePCJ.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloBatchUpdatePCJ.java b/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloBatchUpdatePCJ.java
index 790fe80..76aad02 100644
--- a/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloBatchUpdatePCJ.java
+++ b/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloBatchUpdatePCJ.java
@@ -28,23 +28,6 @@ import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.Connector;
import org.apache.log4j.Logger;
-import org.apache.rya.indexing.pcj.storage.PcjMetadata;
-import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
-import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.PCJStorageException;
-import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage;
-import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
-import org.openrdf.query.BindingSet;
-import org.openrdf.query.MalformedQueryException;
-import org.openrdf.query.QueryEvaluationException;
-import org.openrdf.query.parser.ParsedQuery;
-import org.openrdf.query.parser.sparql.SPARQLParser;
-import org.openrdf.sail.Sail;
-import org.openrdf.sail.SailConnection;
-import org.openrdf.sail.SailException;
-
-import com.google.common.base.Optional;
-
-import info.aduna.iteration.CloseableIteration;
import org.apache.rya.accumulo.AccumuloRdfConfiguration;
import org.apache.rya.accumulo.instance.AccumuloRyaInstanceDetailsRepository;
import org.apache.rya.api.client.BatchUpdatePCJ;
@@ -62,8 +45,25 @@ import org.apache.rya.api.instance.RyaDetailsUpdater.RyaDetailsMutator;
import org.apache.rya.api.instance.RyaDetailsUpdater.RyaDetailsMutator.CouldNotApplyMutationException;
import org.apache.rya.api.persist.RyaDAOException;
import org.apache.rya.indexing.accumulo.ConfigUtils;
+import org.apache.rya.indexing.pcj.storage.PcjMetadata;
+import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
+import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.PCJStorageException;
+import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage;
+import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
import org.apache.rya.rdftriplestore.inference.InferenceEngineException;
import org.apache.rya.sail.config.RyaSailFactory;
+import org.openrdf.query.BindingSet;
+import org.openrdf.query.MalformedQueryException;
+import org.openrdf.query.QueryEvaluationException;
+import org.openrdf.query.parser.ParsedQuery;
+import org.openrdf.query.parser.sparql.SPARQLParser;
+import org.openrdf.sail.Sail;
+import org.openrdf.sail.SailConnection;
+import org.openrdf.sail.SailException;
+
+import com.google.common.base.Optional;
+
+import info.aduna.iteration.CloseableIteration;
/**
* Uses an in memory Rya Client to batch update a PCJ index.
@@ -126,12 +126,11 @@ public class AccumuloBatchUpdatePCJ extends AccumuloCommand implements BatchUpda
SailConnection sailConn = null;
CloseableIteration<? extends BindingSet, QueryEvaluationException> results = null;
- try {
+ try(final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(super.getConnector(), ryaInstanceName)) {
// Create an instance of Sail backed by the Rya instance.
sail = connectToRya(ryaInstanceName);
// Purge the old results from the PCJ.
- final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(super.getConnector(), ryaInstanceName);
try {
pcjStorage.purge(pcjId);
} catch (final PCJStorageException e) {
@@ -139,37 +138,35 @@ public class AccumuloBatchUpdatePCJ extends AccumuloCommand implements BatchUpda
"results could not be purged from it.", e);
}
- try {
- // Parse the PCJ's SPARQL query.
- final PcjMetadata metadata = pcjStorage.getPcjMetadata(pcjId);
- final String sparql = metadata.getSparql();
- final SPARQLParser parser = new SPARQLParser();
- final ParsedQuery parsedQuery = parser.parseQuery(sparql, null);
-
- // Execute the query.
- sailConn = sail.getConnection();
- results = sailConn.evaluate(parsedQuery.getTupleExpr(), null, null, false);
-
- // Load the results into the PCJ table.
- final List<VisibilityBindingSet> batch = new ArrayList<>(1000);
-
- while(results.hasNext()) {
- final VisibilityBindingSet result = new VisibilityBindingSet(results.next(), "");
- batch.add(result);
-
- if(batch.size() == 1000) {
- pcjStorage.addResults(pcjId, batch);
- batch.clear();
- }
- }
+ // Parse the PCJ's SPARQL query.
+ final PcjMetadata metadata = pcjStorage.getPcjMetadata(pcjId);
+ final String sparql = metadata.getSparql();
+ final SPARQLParser parser = new SPARQLParser();
+ final ParsedQuery parsedQuery = parser.parseQuery(sparql, null);
- if(!batch.isEmpty()) {
+ // Execute the query.
+ sailConn = sail.getConnection();
+ results = sailConn.evaluate(parsedQuery.getTupleExpr(), null, null, false);
+
+ // Load the results into the PCJ table.
+ final List<VisibilityBindingSet> batch = new ArrayList<>(1000);
+
+ while(results.hasNext()) {
+ final VisibilityBindingSet result = new VisibilityBindingSet(results.next(), "");
+ batch.add(result);
+
+ if(batch.size() == 1000) {
pcjStorage.addResults(pcjId, batch);
batch.clear();
}
- } catch(final MalformedQueryException | PCJStorageException | SailException | QueryEvaluationException e) {
- throw new RyaClientException("Fail to batch load new results into the PCJ with ID '" + pcjId + "'.", e);
}
+
+ if(!batch.isEmpty()) {
+ pcjStorage.addResults(pcjId, batch);
+ batch.clear();
+ }
+ } catch(final MalformedQueryException | PCJStorageException | SailException | QueryEvaluationException e) {
+ throw new RyaClientException("Fail to batch load new results into the PCJ with ID '" + pcjId + "'.", e);
} finally {
if(results != null) {
try {
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloCreatePCJ.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloCreatePCJ.java b/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloCreatePCJ.java
index ac8da66..3fe1042 100644
--- a/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloCreatePCJ.java
+++ b/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloCreatePCJ.java
@@ -90,47 +90,46 @@ public class AccumuloCreatePCJ extends AccumuloCommand implements CreatePCJ {
// Create the PCJ table that will receive the index results.
final String pcjId;
- final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(getConnector(), instanceName);
- try {
+ try(final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(getConnector(), instanceName)) {
pcjId = pcjStorage.createPcj(sparql);
- } catch (final PCJStorageException e) {
- throw new RyaClientException("Problem while initializing the PCJ table.", e);
- }
- // If a Fluo application is being used, task it with updating the PCJ.
- final Optional<FluoDetails> fluoDetailsHolder = pcjIndexDetails.getFluoDetails();
- if(fluoDetailsHolder.isPresent()) {
- final String fluoAppName = fluoDetailsHolder.get().getUpdateAppName();
- try {
- updateFluoApp(instanceName, fluoAppName, pcjStorage, pcjId);
- } catch (RepositoryException | MalformedQueryException | SailException | QueryEvaluationException | PcjException | RyaDAOException e) {
- throw new RyaClientException("Problem while initializing the Fluo application with the new PCJ.", e);
+ // If a Fluo application is being used, task it with updating the PCJ.
+ final Optional<FluoDetails> fluoDetailsHolder = pcjIndexDetails.getFluoDetails();
+ if(fluoDetailsHolder.isPresent()) {
+ final String fluoAppName = fluoDetailsHolder.get().getUpdateAppName();
+ try {
+ updateFluoApp(instanceName, fluoAppName, pcjStorage, pcjId);
+ } catch (RepositoryException | MalformedQueryException | SailException | QueryEvaluationException | PcjException | RyaDAOException e) {
+ throw new RyaClientException("Problem while initializing the Fluo application with the new PCJ.", e);
+ }
+
+ // Update the Rya Details to indicate the PCJ is being updated incrementally.
+ final RyaDetailsRepository detailsRepo = new AccumuloRyaInstanceDetailsRepository(getConnector(), instanceName);
+ try {
+ new RyaDetailsUpdater(detailsRepo).update(new RyaDetailsMutator() {
+ @Override
+ public RyaDetails mutate(final RyaDetails originalDetails) throws CouldNotApplyMutationException {
+ // Update the original PCJ Details to indicate they are incrementally updated.
+ final PCJDetails originalPCJDetails = originalDetails.getPCJIndexDetails().getPCJDetails().get(pcjId);
+ final PCJDetails.Builder mutatedPCJDetails = PCJDetails.builder( originalPCJDetails )
+ .setUpdateStrategy( PCJUpdateStrategy.INCREMENTAL );
+
+ // Replace the old PCJ Details with the updated ones.
+ final RyaDetails.Builder builder = RyaDetails.builder(originalDetails);
+ builder.getPCJIndexDetails().addPCJDetails( mutatedPCJDetails );
+ return builder.build();
+ }
+ });
+ } catch (RyaDetailsRepositoryException | CouldNotApplyMutationException e) {
+ throw new RyaClientException("Problem while updating the Rya instance's Details to indicate the PCJ is being incrementally updated.", e);
+ }
}
- // Update the Rya Details to indicate the PCJ is being updated incrementally.
- final RyaDetailsRepository detailsRepo = new AccumuloRyaInstanceDetailsRepository(getConnector(), instanceName);
- try {
- new RyaDetailsUpdater(detailsRepo).update(new RyaDetailsMutator() {
- @Override
- public RyaDetails mutate(final RyaDetails originalDetails) throws CouldNotApplyMutationException {
- // Update the original PCJ Details to indicate they are incrementally updated.
- final PCJDetails originalPCJDetails = originalDetails.getPCJIndexDetails().getPCJDetails().get(pcjId);
- final PCJDetails.Builder mutatedPCJDetails = PCJDetails.builder( originalPCJDetails )
- .setUpdateStrategy( PCJUpdateStrategy.INCREMENTAL );
-
- // Replace the old PCJ Details with the updated ones.
- final RyaDetails.Builder builder = RyaDetails.builder(originalDetails);
- builder.getPCJIndexDetails().addPCJDetails( mutatedPCJDetails );
- return builder.build();
- }
- });
- } catch (RyaDetailsRepositoryException | CouldNotApplyMutationException e) {
- throw new RyaClientException("Problem while updating the Rya instance's Details to indicate the PCJ is being incrementally updated.", e);
- }
+ // Return the ID that was assigned to the PCJ.
+ return pcjId;
+ } catch (final PCJStorageException e) {
+ throw new RyaClientException("Problem while initializing the PCJ table.", e);
}
-
- // Return the ID that was assigned to the PCJ.
- return pcjId;
}
private void updateFluoApp(final String ryaInstance, final String fluoAppName, final PrecomputedJoinStorage pcjStorage, final String pcjId) throws RepositoryException, MalformedQueryException, SailException, QueryEvaluationException, PcjException, RyaDAOException {
@@ -139,16 +138,15 @@ public class AccumuloCreatePCJ extends AccumuloCommand implements CreatePCJ {
// Connect to the Fluo application that is updating this instance's PCJs.
final AccumuloConnectionDetails cd = super.getAccumuloConnectionDetails();
- final FluoClient fluoClient = new FluoClientFactory().connect(
+ try(final FluoClient fluoClient = new FluoClientFactory().connect(
cd.getUsername(),
new String(cd.getPassword()),
cd.getInstanceName(),
cd.getZookeepers(),
- fluoAppName);
-
- // Initialize the PCJ within the Fluo application.
- final org.apache.rya.indexing.pcj.fluo.api.CreatePcj fluoCreatePcj = new org.apache.rya.indexing.pcj.fluo.api.CreatePcj();
- fluoCreatePcj.withRyaIntegration(pcjId, pcjStorage, fluoClient, getConnector(), ryaInstance);
+ fluoAppName);) {
+ // Initialize the PCJ within the Fluo application.
+ final org.apache.rya.indexing.pcj.fluo.api.CreatePcj fluoCreatePcj = new org.apache.rya.indexing.pcj.fluo.api.CreatePcj();
+ fluoCreatePcj.withRyaIntegration(pcjId, pcjStorage, fluoClient, getConnector(), ryaInstance);
+ }
}
-
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloDeletePCJ.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloDeletePCJ.java b/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloDeletePCJ.java
index b6728ec..96e6d58 100644
--- a/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloDeletePCJ.java
+++ b/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloDeletePCJ.java
@@ -20,19 +20,7 @@ package org.apache.rya.api.client.accumulo;
import static java.util.Objects.requireNonNull;
-import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
-import edu.umd.cs.findbugs.annotations.NonNull;
-
import org.apache.accumulo.core.client.Connector;
-import org.apache.rya.indexing.pcj.fluo.api.DeletePcj;
-import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
-import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.PCJStorageException;
-import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Optional;
-
import org.apache.fluo.api.client.FluoClient;
import org.apache.rya.api.client.DeletePCJ;
import org.apache.rya.api.client.GetInstanceDetails;
@@ -43,6 +31,17 @@ import org.apache.rya.api.instance.RyaDetails.PCJIndexDetails;
import org.apache.rya.api.instance.RyaDetails.PCJIndexDetails.FluoDetails;
import org.apache.rya.api.instance.RyaDetails.PCJIndexDetails.PCJDetails;
import org.apache.rya.api.instance.RyaDetails.PCJIndexDetails.PCJDetails.PCJUpdateStrategy;
+import org.apache.rya.indexing.pcj.fluo.api.DeletePcj;
+import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
+import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.PCJStorageException;
+import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Optional;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
/**
* An Accumulo implementation of the {@link DeletePCJ} command.
@@ -104,8 +103,7 @@ public class AccumuloDeletePCJ extends AccumuloCommand implements DeletePCJ {
}
// Drop the table that holds the PCJ results from Accumulo.
- final PrecomputedJoinStorage pcjs = new AccumuloPcjStorage(getConnector(), instanceName);
- try {
+ try(final PrecomputedJoinStorage pcjs = new AccumuloPcjStorage(getConnector(), instanceName)) {
pcjs.dropPcj(pcjId);
} catch (final PCJStorageException e) {
throw new RyaClientException("Could not drop the PCJ's table from Accumulo.", e);
@@ -118,14 +116,14 @@ public class AccumuloDeletePCJ extends AccumuloCommand implements DeletePCJ {
// Connect to the Fluo application that is updating this instance's PCJs.
final AccumuloConnectionDetails cd = super.getAccumuloConnectionDetails();
- final FluoClient fluoClient = new FluoClientFactory().connect(
+ try(final FluoClient fluoClient = new FluoClientFactory().connect(
cd.getUsername(),
new String(cd.getPassword()),
cd.getInstanceName(),
cd.getZookeepers(),
- fluoAppName);
-
- // Delete the PCJ from the Fluo App.
- new DeletePcj(1000).deletePcj(fluoClient, pcjId);
+ fluoAppName)) {
+ // Delete the PCJ from the Fluo App.
+ new DeletePcj(1000).deletePcj(fluoClient, pcjId);
+ }
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/indexing/src/main/java/org/apache/rya/indexing/pcj/matching/AccumuloIndexSetProvider.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/org/apache/rya/indexing/pcj/matching/AccumuloIndexSetProvider.java b/extras/indexing/src/main/java/org/apache/rya/indexing/pcj/matching/AccumuloIndexSetProvider.java
index 828ee4b..1940e64 100644
--- a/extras/indexing/src/main/java/org/apache/rya/indexing/pcj/matching/AccumuloIndexSetProvider.java
+++ b/extras/indexing/src/main/java/org/apache/rya/indexing/pcj/matching/AccumuloIndexSetProvider.java
@@ -17,24 +17,6 @@
* under the License.
*/
package org.apache.rya.indexing.pcj.matching;
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
import static java.util.Objects.requireNonNull;
@@ -184,29 +166,31 @@ public class AccumuloIndexSetProvider implements ExternalSetProvider<ExternalTup
}
// this maps associates pcj table name with pcj sparql query
final Map<String, String> indexTables = Maps.newLinkedHashMap();
- final PrecomputedJoinStorage storage = new AccumuloPcjStorage(conn, tablePrefix);
- final PcjTableNameFactory pcjFactory = new PcjTableNameFactory();
- final boolean tablesProvided = tables != null && !tables.isEmpty();
+ try(final PrecomputedJoinStorage storage = new AccumuloPcjStorage(conn, tablePrefix)) {
+ final PcjTableNameFactory pcjFactory = new PcjTableNameFactory();
- if (tablesProvided) {
- // if tables provided, associate table name with sparql
- for (final String table : tables) {
- indexTables.put(table, storage.getPcjMetadata(pcjFactory.getPcjId(table)).getSparql());
- }
- } else if (hasRyaDetails(tablePrefix, conn)) {
- // If this is a newer install of Rya, and it has PCJ Details, then
- // use those.
- final List<String> ids = storage.listPcjs();
- for (final String id : ids) {
- indexTables.put(pcjFactory.makeTableName(tablePrefix, id), storage.getPcjMetadata(id).getSparql());
- }
- } else {
- // Otherwise figure it out by scanning tables.
- final PcjTables pcjTables = new PcjTables();
- for (final String table : conn.tableOperations().list()) {
- if (table.startsWith(tablePrefix + "INDEX")) {
- indexTables.put(table, pcjTables.getPcjMetadata(conn, table).getSparql());
+ final boolean tablesProvided = tables != null && !tables.isEmpty();
+
+ if (tablesProvided) {
+ // if tables provided, associate table name with sparql
+ for (final String table : tables) {
+ indexTables.put(table, storage.getPcjMetadata(pcjFactory.getPcjId(table)).getSparql());
+ }
+ } else if (hasRyaDetails(tablePrefix, conn)) {
+ // If this is a newer install of Rya, and it has PCJ Details, then
+ // use those.
+ final List<String> ids = storage.listPcjs();
+ for (final String id : ids) {
+ indexTables.put(pcjFactory.makeTableName(tablePrefix, id), storage.getPcjMetadata(id).getSparql());
+ }
+ } else {
+ // Otherwise figure it out by scanning tables.
+ final PcjTables pcjTables = new PcjTables();
+ for (final String table : conn.tableOperations().list()) {
+ if (table.startsWith(tablePrefix + "INDEX")) {
+ indexTables.put(table, pcjTables.getPcjMetadata(conn, table).getSparql());
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/AccumuloBatchUpdatePCJIT.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/AccumuloBatchUpdatePCJIT.java b/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/AccumuloBatchUpdatePCJIT.java
index 30eb4ca..5a2e69d 100644
--- a/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/AccumuloBatchUpdatePCJIT.java
+++ b/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/AccumuloBatchUpdatePCJIT.java
@@ -31,6 +31,7 @@ import org.apache.rya.indexing.accumulo.ConfigUtils;
import org.apache.rya.indexing.external.PrecomputedJoinIndexerConfig.PrecomputedJoinStorageType;
import org.apache.rya.indexing.external.PrecomputedJoinIndexerConfig.PrecomputedJoinUpdaterType;
import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
+import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator;
import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage;
import org.apache.rya.sail.config.RyaSailFactory;
import org.junit.Test;
@@ -63,7 +64,7 @@ public class AccumuloBatchUpdatePCJIT extends AccumuloITBase {
.build());
Sail sail = null;
- try {
+ try(final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(super.getConnector(), RYA_INSTANCE_NAME)) {
// Get a Sail connection backed by the installed Rya instance.
final AccumuloRdfConfiguration ryaConf = new AccumuloRdfConfiguration();
ryaConf.setTablePrefix(RYA_INSTANCE_NAME);
@@ -102,7 +103,6 @@ public class AccumuloBatchUpdatePCJIT extends AccumuloITBase {
sailConn.close();
// Create a PCJ for a SPARQL query.
- final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(super.getConnector(), RYA_INSTANCE_NAME);
final String sparql = "SELECT ?name WHERE { ?name <urn:likes> <urn:icecream> . ?name <urn:hasEyeColor> <urn:blue> . }";
final String pcjId = pcjStorage.createPcj(sparql);
@@ -137,8 +137,10 @@ public class AccumuloBatchUpdatePCJIT extends AccumuloITBase {
expectedResults.add(bs);
final Set<BindingSet> results = new HashSet<>();
- for(final BindingSet result : pcjStorage.listResults(pcjId)) {
- results.add( result );
+ try(CloseableIterator<BindingSet> resultsIt = pcjStorage.listResults(pcjId)) {
+ while(resultsIt.hasNext()) {
+ results.add( resultsIt.next() );
+ }
}
assertEquals(expectedResults, results);
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/AccumuloCreatePCJIT.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/AccumuloCreatePCJIT.java b/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/AccumuloCreatePCJIT.java
index 6c9bf5e..f900837 100644
--- a/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/AccumuloCreatePCJIT.java
+++ b/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/AccumuloCreatePCJIT.java
@@ -24,6 +24,15 @@ import static org.junit.Assert.assertFalse;
import java.util.List;
import java.util.Set;
+import org.apache.rya.api.client.CreatePCJ;
+import org.apache.rya.api.client.Install;
+import org.apache.rya.api.client.Install.DuplicateInstanceNameException;
+import org.apache.rya.api.client.Install.InstallConfiguration;
+import org.apache.rya.api.client.InstanceDoesNotExistException;
+import org.apache.rya.api.client.RyaClientException;
+import org.apache.rya.api.instance.RyaDetails;
+import org.apache.rya.api.instance.RyaDetails.PCJIndexDetails.PCJDetails;
+import org.apache.rya.api.instance.RyaDetails.PCJIndexDetails.PCJDetails.PCJUpdateStrategy;
import org.apache.rya.indexing.pcj.fluo.api.ListQueryIds;
import org.apache.rya.indexing.pcj.storage.PcjMetadata;
import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
@@ -36,16 +45,6 @@ import org.openrdf.query.impl.MapBindingSet;
import com.google.common.base.Optional;
import com.google.common.collect.Sets;
-import org.apache.rya.api.client.CreatePCJ;
-import org.apache.rya.api.client.Install;
-import org.apache.rya.api.client.Install.DuplicateInstanceNameException;
-import org.apache.rya.api.client.Install.InstallConfiguration;
-import org.apache.rya.api.client.InstanceDoesNotExistException;
-import org.apache.rya.api.client.RyaClientException;
-import org.apache.rya.api.instance.RyaDetails;
-import org.apache.rya.api.instance.RyaDetails.PCJIndexDetails.PCJDetails;
-import org.apache.rya.api.instance.RyaDetails.PCJIndexDetails.PCJDetails.PCJUpdateStrategy;
-
/**
* Integration tests the methods of {@link AccumuloCreatePCJ}.
*/
@@ -80,42 +79,43 @@ public class AccumuloCreatePCJIT extends FluoITBase {
assertEquals(PCJUpdateStrategy.INCREMENTAL, pcjDetails.getUpdateStrategy().get());
// Verify the PCJ's metadata was initialized.
- final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(accumuloConn, RYA_INSTANCE_NAME);
- final PcjMetadata pcjMetadata = pcjStorage.getPcjMetadata(pcjId);
- assertEquals(sparql, pcjMetadata.getSparql());
- assertEquals(0L, pcjMetadata.getCardinality());
+ try(final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(accumuloConn, RYA_INSTANCE_NAME)) {
+ final PcjMetadata pcjMetadata = pcjStorage.getPcjMetadata(pcjId);
+ assertEquals(sparql, pcjMetadata.getSparql());
+ assertEquals(0L, pcjMetadata.getCardinality());
- // Verify a Query ID was added for the query within the Fluo app.
- final List<String> fluoQueryIds = new ListQueryIds().listQueryIds(fluoClient);
- assertEquals(1, fluoQueryIds.size());
+ // Verify a Query ID was added for the query within the Fluo app.
+ final List<String> fluoQueryIds = new ListQueryIds().listQueryIds(fluoClient);
+ assertEquals(1, fluoQueryIds.size());
- // Insert some statements into Rya.
- final ValueFactory vf = ryaRepo.getValueFactory();
- ryaConn.add(vf.createURI("http://Alice"), vf.createURI("http://talksTo"), vf.createURI("http://Eve"));
- ryaConn.add(vf.createURI("http://Bob"), vf.createURI("http://talksTo"), vf.createURI("http://Eve"));
- ryaConn.add(vf.createURI("http://Charlie"), vf.createURI("http://talksTo"), vf.createURI("http://Eve"));
+ // Insert some statements into Rya.
+ final ValueFactory vf = ryaRepo.getValueFactory();
+ ryaConn.add(vf.createURI("http://Alice"), vf.createURI("http://talksTo"), vf.createURI("http://Eve"));
+ ryaConn.add(vf.createURI("http://Bob"), vf.createURI("http://talksTo"), vf.createURI("http://Eve"));
+ ryaConn.add(vf.createURI("http://Charlie"), vf.createURI("http://talksTo"), vf.createURI("http://Eve"));
- ryaConn.add(vf.createURI("http://Eve"), vf.createURI("http://helps"), vf.createURI("http://Kevin"));
+ ryaConn.add(vf.createURI("http://Eve"), vf.createURI("http://helps"), vf.createURI("http://Kevin"));
- ryaConn.add(vf.createURI("http://Bob"), vf.createURI("http://worksAt"), vf.createURI("http://TacoJoint"));
- ryaConn.add(vf.createURI("http://Charlie"), vf.createURI("http://worksAt"), vf.createURI("http://TacoJoint"));
- ryaConn.add(vf.createURI("http://Eve"), vf.createURI("http://worksAt"), vf.createURI("http://TacoJoint"));
- ryaConn.add(vf.createURI("http://David"), vf.createURI("http://worksAt"), vf.createURI("http://TacoJoint"));
+ ryaConn.add(vf.createURI("http://Bob"), vf.createURI("http://worksAt"), vf.createURI("http://TacoJoint"));
+ ryaConn.add(vf.createURI("http://Charlie"), vf.createURI("http://worksAt"), vf.createURI("http://TacoJoint"));
+ ryaConn.add(vf.createURI("http://Eve"), vf.createURI("http://worksAt"), vf.createURI("http://TacoJoint"));
+ ryaConn.add(vf.createURI("http://David"), vf.createURI("http://worksAt"), vf.createURI("http://TacoJoint"));
- // Verify the correct results were exported.
- fluo.waitForObservers();
+ // Verify the correct results were exported.
+ fluo.waitForObservers();
- final Set<BindingSet> results = Sets.newHashSet( pcjStorage.listResults(pcjId) );
+ final Set<BindingSet> results = Sets.newHashSet( pcjStorage.listResults(pcjId) );
- final MapBindingSet bob = new MapBindingSet();
- bob.addBinding("x", vf.createURI("http://Bob"));
+ final MapBindingSet bob = new MapBindingSet();
+ bob.addBinding("x", vf.createURI("http://Bob"));
- final MapBindingSet charlie = new MapBindingSet();
- charlie.addBinding("x", vf.createURI("http://Charlie"));
+ final MapBindingSet charlie = new MapBindingSet();
+ charlie.addBinding("x", vf.createURI("http://Charlie"));
- final Set<BindingSet> expected = Sets.<BindingSet>newHashSet(bob, charlie);
+ final Set<BindingSet> expected = Sets.<BindingSet>newHashSet(bob, charlie);
- assertEquals(expected, results);
+ assertEquals(expected, results);
+ }
}
@Test(expected = InstanceDoesNotExistException.class)
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/AccumuloDeletePCJIT.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/AccumuloDeletePCJIT.java b/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/AccumuloDeletePCJIT.java
index 573fccd..fd75167 100644
--- a/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/AccumuloDeletePCJIT.java
+++ b/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/AccumuloDeletePCJIT.java
@@ -24,6 +24,10 @@ import static org.junit.Assert.assertTrue;
import java.util.List;
import java.util.Set;
+import org.apache.rya.api.client.CreatePCJ;
+import org.apache.rya.api.client.DeletePCJ;
+import org.apache.rya.api.client.InstanceDoesNotExistException;
+import org.apache.rya.api.client.RyaClientException;
import org.apache.rya.indexing.pcj.fluo.api.ListQueryIds;
import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.PCJStorageException;
@@ -36,11 +40,6 @@ import org.openrdf.repository.RepositoryException;
import com.google.common.collect.Sets;
-import org.apache.rya.api.client.CreatePCJ;
-import org.apache.rya.api.client.DeletePCJ;
-import org.apache.rya.api.client.InstanceDoesNotExistException;
-import org.apache.rya.api.client.RyaClientException;
-
/**
* Integration tests the methods of {@link AccumuloCreatePCJ}.
*/
@@ -86,31 +85,32 @@ public class AccumuloDeletePCJIT extends FluoITBase {
// Verify the correct results were exported.
fluo.waitForObservers();
- final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(accumuloConn, RYA_INSTANCE_NAME);
- final Set<BindingSet> results = Sets.newHashSet( pcjStorage.listResults(pcjId) );
+ try(final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(accumuloConn, RYA_INSTANCE_NAME)) {
+ final Set<BindingSet> results = Sets.newHashSet( pcjStorage.listResults(pcjId) );
- final MapBindingSet bob = new MapBindingSet();
- bob.addBinding("x", vf.createURI("http://Bob"));
+ final MapBindingSet bob = new MapBindingSet();
+ bob.addBinding("x", vf.createURI("http://Bob"));
- final MapBindingSet charlie = new MapBindingSet();
- charlie.addBinding("x", vf.createURI("http://Charlie"));
+ final MapBindingSet charlie = new MapBindingSet();
+ charlie.addBinding("x", vf.createURI("http://Charlie"));
- final Set<BindingSet> expected = Sets.<BindingSet>newHashSet(bob, charlie);
- assertEquals(expected, results);
+ final Set<BindingSet> expected = Sets.<BindingSet>newHashSet(bob, charlie);
+ assertEquals(expected, results);
- // Delete the PCJ.
- final DeletePCJ deletePCJ = new AccumuloDeletePCJ(connectionDetails, accumuloConn);
- deletePCJ.deletePCJ(RYA_INSTANCE_NAME, pcjId);
+ // Delete the PCJ.
+ final DeletePCJ deletePCJ = new AccumuloDeletePCJ(connectionDetails, accumuloConn);
+ deletePCJ.deletePCJ(RYA_INSTANCE_NAME, pcjId);
- // Ensure the PCJ's metadata has been removed from the storage.
- assertTrue( pcjStorage.listPcjs().isEmpty() );
+ // Ensure the PCJ's metadata has been removed from the storage.
+ assertTrue( pcjStorage.listPcjs().isEmpty() );
- // Ensure the PCJ has been removed from the Fluo application.
- fluo.waitForObservers();
+ // Ensure the PCJ has been removed from the Fluo application.
+ fluo.waitForObservers();
- // Verify a Query ID was added for the query within the Fluo app.
- fluoQueryIds = new ListQueryIds().listQueryIds(fluoClient);
- assertEquals(0, fluoQueryIds.size());
+ // Verify a Query ID was added for the query within the Fluo app.
+ fluoQueryIds = new ListQueryIds().listQueryIds(fluoClient);
+ assertEquals(0, fluoQueryIds.size());
+ }
}
@Test(expected = InstanceDoesNotExistException.class)
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/rya.benchmark/src/test/java/org/apache/rya/benchmark/query/QueryBenchmarkRunIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.benchmark/src/test/java/org/apache/rya/benchmark/query/QueryBenchmarkRunIT.java b/extras/rya.benchmark/src/test/java/org/apache/rya/benchmark/query/QueryBenchmarkRunIT.java
index c36cb2c..dd5fe68 100644
--- a/extras/rya.benchmark/src/test/java/org/apache/rya/benchmark/query/QueryBenchmarkRunIT.java
+++ b/extras/rya.benchmark/src/test/java/org/apache/rya/benchmark/query/QueryBenchmarkRunIT.java
@@ -25,10 +25,19 @@ import org.apache.accumulo.minicluster.MiniAccumuloCluster;
import org.apache.accumulo.minicluster.MiniAccumuloConfig;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
+import org.apache.rya.accumulo.AccumuloRdfConfiguration;
+import org.apache.rya.api.client.Install.InstallConfiguration;
+import org.apache.rya.api.client.RyaClient;
+import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails;
+import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory;
import org.apache.rya.benchmark.query.QueryBenchmark.QueryBenchmarkRun;
import org.apache.rya.benchmark.query.QueryBenchmark.QueryBenchmarkRun.NotEnoughResultsException;
+import org.apache.rya.indexing.accumulo.ConfigUtils;
+import org.apache.rya.indexing.external.PrecomputedJoinIndexerConfig.PrecomputedJoinStorageType;
+import org.apache.rya.indexing.external.PrecomputedJoinIndexerConfig.PrecomputedJoinUpdaterType;
import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage;
+import org.apache.rya.sail.config.RyaSailFactory;
import org.apache.zookeeper.ClientCnxn;
import org.junit.AfterClass;
import org.junit.BeforeClass;
@@ -38,16 +47,6 @@ import org.openrdf.sail.Sail;
import org.openrdf.sail.SailConnection;
import org.openrdf.sail.SailException;
-import org.apache.rya.accumulo.AccumuloRdfConfiguration;
-import org.apache.rya.api.client.Install.InstallConfiguration;
-import org.apache.rya.api.client.RyaClient;
-import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails;
-import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory;
-import org.apache.rya.indexing.accumulo.ConfigUtils;
-import org.apache.rya.indexing.external.PrecomputedJoinIndexerConfig.PrecomputedJoinStorageType;
-import org.apache.rya.indexing.external.PrecomputedJoinIndexerConfig.PrecomputedJoinUpdaterType;
-import org.apache.rya.sail.config.RyaSailFactory;
-
/**
* Integration tests {@link QueryBenchmarkRun}.
*/
@@ -145,12 +144,12 @@ public class QueryBenchmarkRunIT {
private static void createTestPCJ(final RyaClient ryaClient) throws Exception {
// Create an empty PCJ within the Rya instance's PCJ storage for the test query.
- final PrecomputedJoinStorage pcjs = new AccumuloPcjStorage(cluster.getConnector(ACCUMULO_USER, ACCUMULO_PASSWORD), RYA_INSTANCE_NAME);
- final String pcjId = pcjs.createPcj(SPARQL_QUERY);
+ try(final PrecomputedJoinStorage pcjs = new AccumuloPcjStorage(cluster.getConnector(ACCUMULO_USER, ACCUMULO_PASSWORD), RYA_INSTANCE_NAME)) {
+ final String pcjId = pcjs.createPcj(SPARQL_QUERY);
-
- // Batch update the PCJ using the Rya Client.
- ryaClient.getBatchUpdatePCJ().batchUpdate(RYA_INSTANCE_NAME, pcjId);
+ // Batch update the PCJ using the Rya Client.
+ ryaClient.getBatchUpdatePCJ().batchUpdate(RYA_INSTANCE_NAME, pcjId);
+ }
}
@AfterClass
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/PrecomputedJoinStorage.java
----------------------------------------------------------------------
diff --git a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/PrecomputedJoinStorage.java b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/PrecomputedJoinStorage.java
index 16653ee..38ae1b2 100644
--- a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/PrecomputedJoinStorage.java
+++ b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/PrecomputedJoinStorage.java
@@ -22,17 +22,17 @@ import java.util.Collection;
import java.util.Iterator;
import java.util.List;
-import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
-import edu.umd.cs.findbugs.annotations.NonNull;
-
import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
import org.openrdf.query.BindingSet;
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
/**
* Functions that create and maintain the PCJ tables that are used by Rya.
*/
@DefaultAnnotation(NonNull.class)
-public interface PrecomputedJoinStorage {
+public interface PrecomputedJoinStorage extends AutoCloseable {
/**
* Get a list of all Precomputed Join indices that are being maintained.
@@ -75,7 +75,7 @@ public interface PrecomputedJoinStorage {
* results for the PCJ.
* @throws PCJStorageException The scan couldn't be performed.
*/
- public Iterable<BindingSet> listResults(String pcjId) throws PCJStorageException;
+ public CloseableIterator<BindingSet> listResults(String pcjId) throws PCJStorageException;
/**
* Clears all values from a Precomputed Join index. The index will remain,
@@ -94,15 +94,25 @@ public interface PrecomputedJoinStorage {
*/
public void dropPcj(final String pcjId) throws PCJStorageException;
-
/**
* Releases and resources that are being used by the storage.
*
* @throws PCJStorageException Indicates the resources could not be released.
*/
+ @Override
public void close() throws PCJStorageException;
/**
+ * An {@link Iterator} that also extends {@link AutoCloseable} because it has reference to resources
+ * that need to be released once you are done iterating.
+ *
+ * @param <T> The type of object that is iterated over.
+ */
+ public static interface CloseableIterator<T> extends Iterator<T>, AutoCloseable {
+
+ }
+
+ /**
* An operation of {@link PrecomputedJoinStorage} failed.
*/
public static class PCJStorageException extends PcjException {
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/AccumuloPcjSerializer.java
----------------------------------------------------------------------
diff --git a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/AccumuloPcjSerializer.java b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/AccumuloPcjSerializer.java
index 4769758..999b26f 100644
--- a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/AccumuloPcjSerializer.java
+++ b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/AccumuloPcjSerializer.java
@@ -34,7 +34,6 @@ import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
import edu.umd.cs.findbugs.annotations.NonNull;
import org.openrdf.model.Value;
-import org.openrdf.query.Binding;
import org.openrdf.query.BindingSet;
import org.openrdf.query.algebra.evaluation.QueryBindingSet;
@@ -57,7 +56,6 @@ public class AccumuloPcjSerializer implements BindingSetConverter<byte[]> {
public byte[] convert(BindingSet bindingSet, VariableOrder varOrder) throws BindingSetConversionException {
checkNotNull(bindingSet);
checkNotNull(varOrder);
- checkBindingsSubsetOfVarOrder(bindingSet, varOrder);
// A list that holds all of the byte segments that will be concatenated at the end.
// This minimizes byte[] construction.
@@ -112,24 +110,6 @@ public class AccumuloPcjSerializer implements BindingSetConverter<byte[]> {
}
}
- /**
- * Checks to see if the names of all the {@link Binding}s in the {@link BindingSet}
- * are a subset of the variables names in {@link VariableOrder}.
- *
- * @param bindingSet - The binding set whose Bindings will be inspected. (not null)
- * @param varOrder - The names of the bindings that may appear in the BindingSet. (not null)
- * @throws IllegalArgumentException Indicates the names of the bindings are
- * not a subset of the variable order.
- */
- private static void checkBindingsSubsetOfVarOrder(BindingSet bindingSet, VariableOrder varOrder) throws IllegalArgumentException {
- checkNotNull(bindingSet);
- checkNotNull(varOrder);
-
- Set<String> bindingNames = bindingSet.getBindingNames();
- List<String> varNames = varOrder.getVariableOrders();
- checkArgument(varNames.containsAll(bindingNames), "The BindingSet contains a Binding whose name is not part of the VariableOrder.");
- }
-
private static final byte[] concat(Iterable<byte[]> byteSegments) {
checkNotNull(byteSegments);
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/AccumuloPcjStorage.java
----------------------------------------------------------------------
diff --git a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/AccumuloPcjStorage.java b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/AccumuloPcjStorage.java
index 6024d12..b8974e6 100644
--- a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/AccumuloPcjStorage.java
+++ b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/AccumuloPcjStorage.java
@@ -25,9 +25,6 @@ import java.util.Collection;
import java.util.List;
import java.util.Set;
-import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
-import edu.umd.cs.findbugs.annotations.NonNull;
-
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.Connector;
@@ -47,6 +44,9 @@ import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
import org.openrdf.query.BindingSet;
import org.openrdf.query.MalformedQueryException;
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
/**
* An Accumulo backed implementation of {@link PrecomputedJoinStorage}.
*/
@@ -156,7 +156,7 @@ public class AccumuloPcjStorage implements PrecomputedJoinStorage {
}
@Override
- public Iterable<BindingSet> listResults(final String pcjId) throws PCJStorageException {
+ public CloseableIterator<BindingSet> listResults(final String pcjId) throws PCJStorageException {
requireNonNull(pcjId);
try {
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/BindingSetConverter.java
----------------------------------------------------------------------
diff --git a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/BindingSetConverter.java b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/BindingSetConverter.java
index d2cf366..c920824 100644
--- a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/BindingSetConverter.java
+++ b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/BindingSetConverter.java
@@ -18,12 +18,12 @@
*/
package org.apache.rya.indexing.pcj.storage.accumulo;
-import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
-import edu.umd.cs.findbugs.annotations.NonNull;
-
import org.openrdf.query.Binding;
import org.openrdf.query.BindingSet;
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
/**
* Converts {@link BindingSet}s into other representations. This library is
* intended to convert between BindingSet and whatever format it is being
@@ -52,8 +52,7 @@ public interface BindingSetConverter<T> {
* resulting model. (not null)
* @return The BindingSet formatted as the target model.
* @throws BindingSetConversionException The BindingSet was unable to be
- * converted into the target model. This will happen if the BindingSet has
- * a binding whose name is not in the VariableOrder or if one of the values
+ * converted into the target model. This will happen if one of the values
* could not be converted into the target model.
*/
public T convert(BindingSet bindingSet, VariableOrder varOrder) throws BindingSetConversionException;
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/BindingSetStringConverter.java
----------------------------------------------------------------------
diff --git a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/BindingSetStringConverter.java b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/BindingSetStringConverter.java
index b2d04e1..4120fd9 100644
--- a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/BindingSetStringConverter.java
+++ b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/BindingSetStringConverter.java
@@ -19,29 +19,27 @@
package org.apache.rya.indexing.pcj.storage.accumulo;
import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
+import static java.util.Objects.requireNonNull;
import java.util.ArrayList;
import java.util.List;
-import java.util.Set;
-
-import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
-import edu.umd.cs.findbugs.annotations.NonNull;
+import org.apache.rya.api.domain.RyaType;
+import org.apache.rya.api.resolver.RdfToRyaConversions;
import org.openrdf.model.URI;
import org.openrdf.model.Value;
import org.openrdf.model.ValueFactory;
import org.openrdf.model.impl.URIImpl;
import org.openrdf.model.impl.ValueFactoryImpl;
import org.openrdf.model.vocabulary.XMLSchema;
-import org.openrdf.query.Binding;
import org.openrdf.query.BindingSet;
import org.openrdf.query.algebra.evaluation.QueryBindingSet;
+import org.openrdf.query.impl.MapBindingSet;
import com.google.common.base.Joiner;
-import org.apache.rya.api.domain.RyaType;
-import org.apache.rya.api.resolver.RdfToRyaConversions;
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
/**
* Converts {@link BindingSet}s to Strings and back again. The Strings do not
@@ -58,7 +56,8 @@ public class BindingSetStringConverter implements BindingSetConverter<String> {
@Override
public String convert(final BindingSet bindingSet, final VariableOrder varOrder) {
- checkBindingsSubsetOfVarOrder(bindingSet, varOrder);
+ requireNonNull(bindingSet);
+ requireNonNull(varOrder);
// Convert each Binding to a String.
final List<String> bindingStrings = new ArrayList<>();
@@ -79,38 +78,26 @@ public class BindingSetStringConverter implements BindingSetConverter<String> {
return Joiner.on(BINDING_DELIM).join(bindingStrings);
}
- /**
- * Checks to see if the names of all the {@link Binding}s in the {@link BindingSet}
- * are a subset of the variables names in {@link VariableOrder}.
- *
- * @param bindingSet - The binding set whose Bindings will be inspected. (not null)
- * @param varOrder - The names of the bindings that may appear in the BindingSet. (not null)
- * @throws IllegalArgumentException Indicates the names of the bindings are
- * not a subset of the variable order.
- */
- private static void checkBindingsSubsetOfVarOrder(final BindingSet bindingSet, final VariableOrder varOrder) throws IllegalArgumentException {
- checkNotNull(bindingSet);
- checkNotNull(varOrder);
-
- final Set<String> bindingNames = bindingSet.getBindingNames();
- final List<String> varNames = varOrder.getVariableOrders();
- checkArgument(varNames.containsAll(bindingNames), "The BindingSet contains a Binding whose name is not part of the VariableOrder.");
- }
-
@Override
public BindingSet convert(final String bindingSetString, final VariableOrder varOrder) {
- checkNotNull(bindingSetString);
- checkNotNull(varOrder);
+ requireNonNull(bindingSetString);
+ requireNonNull(varOrder);
+
+ // If both are empty, return an empty binding set.
+ if(bindingSetString.isEmpty() && varOrder.toString().isEmpty()) {
+ return new MapBindingSet();
+ }
+ // Otherwise parse it.
final String[] bindingStrings = bindingSetString.split(BINDING_DELIM);
- final String[] varOrrderArr = varOrder.toArray();
- checkArgument(varOrrderArr.length == bindingStrings.length, "The number of Bindings must match the length of the VariableOrder.");
+ final String[] varOrderArr = varOrder.toArray();
+ checkArgument(varOrderArr.length == bindingStrings.length, "The number of Bindings must match the length of the VariableOrder.");
final QueryBindingSet bindingSet = new QueryBindingSet();
for(int i = 0; i < bindingStrings.length; i++) {
final String bindingString = bindingStrings[i];
if(!NULL_VALUE_STRING.equals(bindingString)) {
- final String name = varOrrderArr[i];
+ final String name = varOrderArr[i];
final Value value = toValue(bindingStrings[i]);
bindingSet.addBinding(name, value);
}
@@ -125,7 +112,7 @@ public class BindingSetStringConverter implements BindingSetConverter<String> {
* @return The {@link Value} representation of the String.
*/
protected static Value toValue(final String valueString) {
- checkNotNull(valueString);
+ requireNonNull(valueString);
// Split the String that was stored in Fluo into its Value and Type parts.
final String[] valueAndType = valueString.split(TYPE_DELIM);
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjTables.java
----------------------------------------------------------------------
diff --git a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjTables.java b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjTables.java
index ce3e5d1..5d13597 100644
--- a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjTables.java
+++ b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjTables.java
@@ -30,9 +30,6 @@ import java.util.List;
import java.util.Map.Entry;
import java.util.Set;
-import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
-import edu.umd.cs.findbugs.annotations.NonNull;
-
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.BatchWriter;
@@ -59,6 +56,7 @@ import org.apache.accumulo.core.security.ColumnVisibility;
import org.apache.hadoop.io.Text;
import org.apache.log4j.Logger;
import org.apache.rya.indexing.pcj.storage.PcjMetadata;
+import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator;
import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.PCJStorageException;
import org.apache.rya.indexing.pcj.storage.accumulo.BindingSetConverter.BindingSetConversionException;
import org.openrdf.query.BindingSet;
@@ -72,6 +70,9 @@ import org.openrdf.repository.RepositoryException;
import com.google.common.base.Optional;
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
/**
* Functions that create and maintain the PCJ tables that are used by Rya.
*/
@@ -157,6 +158,7 @@ public class PcjTables {
final TableOperations tableOps = accumuloConn.tableOperations();
if(!tableOps.exists(pcjTableName)) {
+ BatchWriter writer = null;
try {
// Create the new table in Accumulo.
tableOps.create(pcjTableName);
@@ -165,14 +167,21 @@ public class PcjTables {
final PcjMetadata pcjMetadata = new PcjMetadata(sparql, 0L, varOrders);
final List<Mutation> mutations = makeWriteMetadataMutations(pcjMetadata);
- final BatchWriter writer = accumuloConn.createBatchWriter(pcjTableName, new BatchWriterConfig());
+ writer = accumuloConn.createBatchWriter(pcjTableName, new BatchWriterConfig());
writer.addMutations(mutations);
- writer.close();
} catch (final TableExistsException e) {
log.warn("Something else just created the Rya PCJ export table named '" + pcjTableName
+ "'. This is unexpected, but we will continue as normal.");
} catch (AccumuloException | AccumuloSecurityException | TableNotFoundException e) {
throw new PCJStorageException("Could not create a new PCJ named: " + pcjTableName, e);
+ } finally {
+ if(writer != null) {
+ try {
+ writer.close();
+ } catch (final MutationsRejectedException e) {
+ log.error("Mutations rejected while creating the PCJ table.", e);
+ }
+ }
}
}
}
@@ -231,9 +240,10 @@ public class PcjTables {
checkNotNull(accumuloConn);
checkNotNull(pcjTableName);
+ Scanner scanner = null;
try {
// Create an Accumulo scanner that iterates through the metadata entries.
- final Scanner scanner = accumuloConn.createScanner(pcjTableName, new Authorizations());
+ scanner = accumuloConn.createScanner(pcjTableName, new Authorizations());
final Iterator<Entry<Key, Value>> entries = scanner.iterator();
// No metadata has been stored in the table yet.
@@ -266,6 +276,10 @@ public class PcjTables {
} catch (final TableNotFoundException e) {
throw new PCJStorageException("Could not add results to a PCJ because the PCJ table does not exist.", e);
+ } finally {
+ if(scanner != null) {
+ scanner.close();
+ }
}
}
@@ -310,7 +324,7 @@ public class PcjTables {
* results for the PCJ.
* @throws PCJStorageException The binding sets could not be fetched.
*/
- public Iterable<BindingSet> listResults(final Connector accumuloConn, final String pcjTableName, final Authorizations auths) throws PCJStorageException {
+ public CloseableIterator<BindingSet> listResults(final Connector accumuloConn, final String pcjTableName, final Authorizations auths) throws PCJStorageException {
requireNonNull(pcjTableName);
// Fetch the Variable Orders for the binding sets and choose one of them. It
@@ -324,12 +338,7 @@ public class PcjTables {
scanner.fetchColumnFamily( new Text(varOrder.toString()) );
// Return an Iterator that uses that scanner.
- return new Iterable<BindingSet>() {
- @Override
- public Iterator<BindingSet> iterator() {
- return new ScannerBindingSetIterator(scanner, varOrder);
- }
- };
+ return new ScannerBindingSetIterator(scanner, varOrder);
} catch (final TableNotFoundException e) {
throw new PCJStorageException(String.format("PCJ Table does not exist for name '%s'.", pcjTableName), e);
@@ -398,10 +407,10 @@ public class PcjTables {
for(final VariableOrder varOrder : varOrders) {
try {
// Serialize the result to the variable order.
- final byte[] serializedResult = converter.convert(result, varOrder);
+ final byte[] rowKey = converter.convert(result, varOrder);
// Row ID = binding set values, Column Family = variable order of the binding set.
- final Mutation addResult = new Mutation(serializedResult);
+ final Mutation addResult = new Mutation(rowKey);
final String visibility = result.getVisibility();
addResult.put(varOrder.toString(), "", new ColumnVisibility(visibility), "");
mutations.add(addResult);
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/ScannerBindingSetIterator.java
----------------------------------------------------------------------
diff --git a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/ScannerBindingSetIterator.java b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/ScannerBindingSetIterator.java
index d0fd7bf..26fd8c9 100644
--- a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/ScannerBindingSetIterator.java
+++ b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/ScannerBindingSetIterator.java
@@ -20,27 +20,30 @@ package org.apache.rya.indexing.pcj.storage.accumulo;
import static java.util.Objects.requireNonNull;
+import java.io.IOException;
import java.util.Iterator;
import java.util.Map.Entry;
-import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
-import edu.umd.cs.findbugs.annotations.NonNull;
-
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Value;
+import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator;
import org.apache.rya.indexing.pcj.storage.accumulo.BindingSetConverter.BindingSetConversionException;
import org.openrdf.query.BindingSet;
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
/**
* Iterates over the results of a {@link Scanner} assuming the results are
* binding sets that can be converted using a {@link AccumuloPcjSerializer}.
*/
@DefaultAnnotation(NonNull.class)
-public class ScannerBindingSetIterator implements Iterator<BindingSet> {
+public class ScannerBindingSetIterator implements CloseableIterator<BindingSet> {
private static final AccumuloPcjSerializer converter = new AccumuloPcjSerializer();
+ private final Scanner scanner;
private final Iterator<Entry<Key, Value>> accEntries;
private final VariableOrder varOrder;
@@ -51,7 +54,7 @@ public class ScannerBindingSetIterator implements Iterator<BindingSet> {
* @param varOrder - The variable order of the binding sets the scanner returns. (not null)
*/
public ScannerBindingSetIterator(final Scanner scanner, final VariableOrder varOrder) {
- requireNonNull(scanner);
+ this.scanner = requireNonNull(scanner);
this.accEntries = scanner.iterator();
this.varOrder = requireNonNull(varOrder);
}
@@ -71,4 +74,9 @@ public class ScannerBindingSetIterator implements Iterator<BindingSet> {
throw new RuntimeException("Could not deserialize a BindingSet from Accumulo.", e);
}
}
+
+ @Override
+ public void close() throws IOException {
+ scanner.close();
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/VariableOrder.java
----------------------------------------------------------------------
diff --git a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/VariableOrder.java b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/VariableOrder.java
index 6ec801e..151db50 100644
--- a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/VariableOrder.java
+++ b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/VariableOrder.java
@@ -23,15 +23,15 @@ import static com.google.common.base.Preconditions.checkNotNull;
import java.util.Collection;
import java.util.Iterator;
-import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
-import edu.umd.cs.findbugs.annotations.NonNull;
-import net.jcip.annotations.Immutable;
-
import org.openrdf.query.BindingSet;
import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableList;
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+import net.jcip.annotations.Immutable;
+
/**
* An ordered list of {@link BindingSet} variable names. These are used to
* specify the order {@link Binding}s within the set are serialized to Accumulo.
@@ -46,6 +46,13 @@ public final class VariableOrder implements Iterable<String> {
private final ImmutableList<String> variableOrder;
/**
+ * Constructs an instance of {@link VariableOrder} when there are no variables.
+ */
+ public VariableOrder() {
+ variableOrder = ImmutableList.of();
+ }
+
+ /**
* Constructs an instance of {@link VariableOrder}.
*
* @param varOrder - An ordered array of Binding Set variables. (not null)
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/update/PrecomputedJoinUpdater.java
----------------------------------------------------------------------
diff --git a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/update/PrecomputedJoinUpdater.java b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/update/PrecomputedJoinUpdater.java
index 2baa52e..f67110e 100644
--- a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/update/PrecomputedJoinUpdater.java
+++ b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/update/PrecomputedJoinUpdater.java
@@ -20,19 +20,18 @@ package org.apache.rya.indexing.pcj.update;
import java.util.Collection;
-import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
-import edu.umd.cs.findbugs.annotations.NonNull;
-
+import org.apache.rya.api.domain.RyaStatement;
import org.apache.rya.indexing.pcj.storage.PcjException;
-import org.apache.rya.api.domain.RyaStatement;
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
/**
* Updates the state of all PCJ indices whenever {@link RyaStatement}s are
* added to or removed from the system.
*/
@DefaultAnnotation(NonNull.class)
-public interface PrecomputedJoinUpdater {
+public interface PrecomputedJoinUpdater extends AutoCloseable {
/**
* The PCJ indices will be updated to include new statements within
@@ -80,6 +79,7 @@ public interface PrecomputedJoinUpdater {
*
* @throws PcjUpdateException The updater could not be closed.
*/
+ @Override
public void close() throws PcjUpdateException;
/**