You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rya.apache.org by mi...@apache.org on 2016/10/13 19:05:17 UTC
[4/7] incubator-rya git commit: RYA-142 Stopped using Fluo TypeLayer
RYA-142 Stopped using Fluo TypeLayer
Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/177c80a1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/177c80a1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/177c80a1
Branch: refs/heads/master
Commit: 177c80a1ed9845f7914610714cb7084d4877cf76
Parents: ca2743a
Author: Keith Turner <ke...@deenlo.com>
Authored: Thu Oct 6 18:55:34 2016 -0400
Committer: Aaron Mihalik <mi...@alum.mit.edu>
Committed: Thu Oct 13 12:55:44 2016 -0400
----------------------------------------------------------------------
.../rya/indexing/pcj/fluo/api/CreatePcj.java | 22 ++--
.../rya/indexing/pcj/fluo/api/DeletePcj.java | 30 +++---
.../indexing/pcj/fluo/api/GetPcjMetadata.java | 12 +--
.../indexing/pcj/fluo/api/InsertTriples.java | 19 +---
.../rya/indexing/pcj/fluo/api/ListQueryIds.java | 5 +-
extras/rya.pcj.fluo/pcj.fluo.app/pom.xml | 4 -
.../pcj/fluo/app/FilterResultUpdater.java | 8 +-
.../rya/indexing/pcj/fluo/app/IncUpdateDAO.java | 9 +-
.../pcj/fluo/app/JoinResultUpdater.java | 10 +-
.../pcj/fluo/app/QueryResultUpdater.java | 10 +-
.../indexing/pcj/fluo/app/StringTypeLayer.java | 29 ------
.../app/export/IncrementalResultExporter.java | 4 +-
.../fluo/app/export/rya/RyaResultExporter.java | 7 +-
.../fluo/app/observers/BindingSetUpdater.java | 12 +--
.../fluo/app/observers/QueryResultObserver.java | 18 ++--
.../pcj/fluo/app/observers/TripleObserver.java | 30 ++----
.../fluo/app/query/FluoQueryMetadataDAO.java | 100 +++++++++----------
.../indexing/pcj/fluo/api/ListQueryIdsIT.java | 13 ++-
pom.xml | 6 --
19 files changed, 127 insertions(+), 221 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/177c80a1/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreatePcj.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreatePcj.java b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreatePcj.java
index 943a022..d31e578 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreatePcj.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreatePcj.java
@@ -29,7 +29,6 @@ import java.util.Set;
import javax.annotation.ParametersAreNonnullByDefault;
import org.apache.rya.indexing.pcj.fluo.app.FluoStringConverter;
-import org.apache.rya.indexing.pcj.fluo.app.StringTypeLayer;
import org.apache.rya.indexing.pcj.fluo.app.query.FluoQuery;
import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO;
@@ -55,7 +54,7 @@ import org.openrdf.sail.SailException;
import info.aduna.iteration.CloseableIteration;
import org.apache.fluo.api.client.FluoClient;
-import org.apache.fluo.recipes.core.types.TypedTransaction;
+import org.apache.fluo.api.client.Transaction;
/**
* Sets up a new Pre Computed Join (PCJ) in Fluo from a SPARQL query.
@@ -76,11 +75,6 @@ import org.apache.fluo.recipes.core.types.TypedTransaction;
public class CreatePcj {
/**
- * Wraps Fluo {@link Transaction}s so that we can write String values to them.
- */
- private static final StringTypeLayer STRING_TYPED_LAYER = new StringTypeLayer();
-
- /**
* The default Statement Pattern batch insert size is 1000.
*/
private static final int DEFAULT_SP_INSERT_BATCH_SIZE = 1000;
@@ -150,15 +144,15 @@ public class CreatePcj {
final ParsedQuery parsedQuery = new SPARQLParser().parseQuery(sparql, null);
final FluoQuery fluoQuery = new SparqlFluoQueryBuilder().make(parsedQuery, nodeIds);
- try(TypedTransaction tx = STRING_TYPED_LAYER.wrap( fluo.newTransaction() )) {
+ try(Transaction tx = fluo.newTransaction()) {
// Write the query's structure to Fluo.
new FluoQueryMetadataDAO().write(tx, fluoQuery);
// The results of the query are eventually exported to an instance of Rya, so store the Rya ID for the PCJ.
final String queryId = fluoQuery.getQueryMetadata().getNodeId();
- tx.mutate().row(queryId).col(FluoQueryColumns.RYA_PCJ_ID).set(pcjId);
- tx.mutate().row(pcjId).col(FluoQueryColumns.PCJ_ID_QUERY_ID).set(queryId);
-
+ tx.set(queryId, FluoQueryColumns.RYA_PCJ_ID, pcjId);
+ tx.set(pcjId, FluoQueryColumns.PCJ_ID_QUERY_ID, queryId);
+
// Flush the changes to Fluo.
tx.commit();
}
@@ -206,7 +200,7 @@ public class CreatePcj {
final BindingSetStringConverter converter = new BindingSetStringConverter();
- try(TypedTransaction tx = STRING_TYPED_LAYER.wrap(fluo.newTransaction())) {
+ try(Transaction tx = fluo.newTransaction()) {
// Get the node's variable order.
final String spNodeId = spMetadata.getNodeId();
final VariableOrder varOrder = spMetadata.getVariableOrder();
@@ -221,9 +215,7 @@ public class CreatePcj {
final String bindingSetStr = converter.convert(spBindingSet, varOrder);
// Write the binding set entry to Fluo for the statement pattern.
- tx.mutate().row(spNodeId + NODEID_BS_DELIM + bindingSetStr)
- .col(FluoQueryColumns.STATEMENT_PATTERN_BINDING_SET)
- .set(bindingSetStr);
+ tx.set(spNodeId + NODEID_BS_DELIM + bindingSetStr, FluoQueryColumns.STATEMENT_PATTERN_BINDING_SET, bindingSetStr);
}
tx.commit();
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/177c80a1/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/DeletePcj.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/DeletePcj.java b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/DeletePcj.java
index e1a9b8e..79ca0ea 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/DeletePcj.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/DeletePcj.java
@@ -28,7 +28,6 @@ import java.util.List;
import javax.annotation.ParametersAreNonnullByDefault;
import org.apache.rya.indexing.pcj.fluo.app.NodeType;
-import org.apache.rya.indexing.pcj.fluo.app.StringTypeLayer;
import org.apache.rya.indexing.pcj.fluo.app.query.FilterMetadata;
import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO;
@@ -43,7 +42,6 @@ import org.apache.fluo.api.data.Bytes;
import org.apache.fluo.api.data.Column;
import org.apache.fluo.api.data.RowColumnValue;
import org.apache.fluo.api.data.Span;
-import org.apache.fluo.recipes.core.types.TypedTransaction;
/**
* Deletes a Pre-computed Join (PCJ) from Fluo.
@@ -174,7 +172,7 @@ public class DeletePcj {
requireNonNull(nodeIds);
requireNonNull(pcjId);
- try (final TypedTransaction typeTx = new StringTypeLayer().wrap(tx)) {
+ try (final Transaction typeTx = tx) {
deletePcjIdAndSparqlMetadata(typeTx, pcjId);
for (final String nodeId : nodeIds) {
@@ -192,7 +190,7 @@ public class DeletePcj {
* @param nodeId - The Node ID of the query node to delete. (not null)
* @param columns - The columns that will be deleted. (not null)
*/
- private void deleteMetadataColumns(final TypedTransaction tx, final String nodeId, final List<Column> columns) {
+ private void deleteMetadataColumns(final Transaction tx, final String nodeId, final List<Column> columns) {
requireNonNull(tx);
requireNonNull(columns);
requireNonNull(nodeId);
@@ -211,15 +209,15 @@ public class DeletePcj {
* @param tx - Transaction the deletes will be performed with. (not null)
* @param pcjId - The PCJ whose metadata will be deleted. (not null)
*/
- private void deletePcjIdAndSparqlMetadata(final TypedTransaction tx, final String pcjId) {
+ private void deletePcjIdAndSparqlMetadata(final Transaction tx, final String pcjId) {
requireNonNull(tx);
requireNonNull(pcjId);
final String queryId = getQueryIdFromPcjId(tx, pcjId);
final String sparql = getSparqlFromQueryId(tx, queryId);
- tx.delete(Bytes.of(queryId), FluoQueryColumns.RYA_PCJ_ID);
- tx.delete(Bytes.of(sparql), FluoQueryColumns.QUERY_ID);
- tx.delete(Bytes.of(pcjId), FluoQueryColumns.PCJ_ID_QUERY_ID);
+ tx.delete(queryId, FluoQueryColumns.RYA_PCJ_ID);
+ tx.delete(sparql, FluoQueryColumns.QUERY_ID);
+ tx.delete(pcjId, FluoQueryColumns.PCJ_ID_QUERY_ID);
}
@@ -253,17 +251,19 @@ public class DeletePcj {
requireNonNull(scanner);
requireNonNull(column);
- int count = 0;
- Iterator<RowColumnValue> iter = scanner.iterator();
- while (iter.hasNext() && count < batchSize) {
+ try(Transaction ntx = tx) {
+ int count = 0;
+ Iterator<RowColumnValue> iter = scanner.iterator();
+ while (iter.hasNext() && count < batchSize) {
final Bytes row = iter.next().getRow();
count++;
tx.delete(row, column);
- }
+ }
- final boolean hasNext = iter.hasNext();
- tx.commit();
- return hasNext;
+ final boolean hasNext = iter.hasNext();
+ tx.commit();
+ return hasNext;
+ }
}
private String getQueryIdFromPcjId(final Transaction tx, final String pcjId) {
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/177c80a1/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/GetPcjMetadata.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/GetPcjMetadata.java b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/GetPcjMetadata.java
index d8c800e..061a1d5 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/GetPcjMetadata.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/GetPcjMetadata.java
@@ -24,15 +24,14 @@ import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
-import org.apache.rya.indexing.pcj.fluo.app.StringTypeLayer;
import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
import org.apache.rya.indexing.pcj.storage.PcjException;
import org.apache.rya.indexing.pcj.storage.PcjMetadata;
import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
import org.apache.fluo.api.client.FluoClient;
+import org.apache.fluo.api.client.Snapshot;
import org.apache.fluo.api.data.Bytes;
-import org.apache.fluo.recipes.core.types.TypedSnapshot;
/**
* Get {@link PcjMetadata} for queries that are managed by the Fluo app.
@@ -87,13 +86,12 @@ public class GetPcjMetadata {
// Lookup the Rya PCJ ID associated with the query.
String pcjId = null;
- try(TypedSnapshot snap = new StringTypeLayer().wrap( fluo.newSnapshot() ) ) {
- final Bytes pcjIdBytes = snap.get(Bytes.of(queryId), FluoQueryColumns.RYA_PCJ_ID);
- if(pcjIdBytes == null) {
+ try(Snapshot snap = fluo.newSnapshot() ) {
+ pcjId = snap.gets(queryId, FluoQueryColumns.RYA_PCJ_ID);
+ if(pcjId == null) {
throw new NotInFluoException("Could not get the PcjMetadata for queryId '" + queryId +
"' because a Rya PCJ ID not stored in the Fluo table.");
}
- pcjId = pcjIdBytes.toString();
}
// Fetch the metadata from the storage.
@@ -128,4 +126,4 @@ public class GetPcjMetadata {
super(message, cause);
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/177c80a1/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/InsertTriples.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/InsertTriples.java b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/InsertTriples.java
index 1f45388..b184ff3 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/InsertTriples.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/InsertTriples.java
@@ -25,15 +25,13 @@ import java.util.Collections;
import java.util.Map;
import org.apache.log4j.Logger;
-import org.apache.rya.indexing.pcj.fluo.app.StringTypeLayer;
import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
import com.google.common.base.Optional;
import org.apache.fluo.api.client.FluoClient;
-import org.apache.fluo.recipes.core.types.Encoder;
-import org.apache.fluo.recipes.core.types.StringEncoder;
-import org.apache.fluo.recipes.core.types.TypedTransaction;
+import org.apache.fluo.api.client.Transaction;
+import org.apache.fluo.api.data.Bytes;
import mvm.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT;
import mvm.rya.api.domain.RyaStatement;
import mvm.rya.api.resolver.triple.TripleRow;
@@ -48,17 +46,10 @@ public class InsertTriples {
private static final Logger log = Logger.getLogger(InsertTriples.class);
/**
- * Wraps Fluo {@link Transaction}s so that we can write String values to them.
- */
- private static final StringTypeLayer STRING_TYPED_LAYER = new StringTypeLayer();
-
- /**
* Converts triples into the byte[] used as the row ID in Accumulo.
*/
private static final WholeRowTripleResolver TRIPLE_RESOLVER = new WholeRowTripleResolver();
- private static final Encoder ENCODER = new StringEncoder();
-
// TODO visiblity is part of RyaStatement. Put it there instead.
/**
@@ -85,10 +76,10 @@ public class InsertTriples {
checkNotNull(triples);
checkNotNull(visibility);
- try(TypedTransaction tx = STRING_TYPED_LAYER.wrap(fluo.newTransaction())) {
+ try(Transaction tx = fluo.newTransaction()) {
for(final RyaStatement triple : triples) {
try {
- tx.mutate().row(spoFormat(triple)).col(FluoQueryColumns.TRIPLES).set(ENCODER.encode(visibility.or("")));
+ tx.set(Bytes.of(spoFormat(triple)), FluoQueryColumns.TRIPLES, Bytes.of(visibility.or("")));
} catch (final TripleRowResolverException e) {
log.error("Could not convert a Triple into the SPO format: " + triple);
}
@@ -111,4 +102,4 @@ public class InsertTriples {
final TripleRow spoRow = serialized.get(TABLE_LAYOUT.SPO);
return spoRow.getRow();
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/177c80a1/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/ListQueryIds.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/ListQueryIds.java b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/ListQueryIds.java
index 3913e41..df1648b 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/ListQueryIds.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/ListQueryIds.java
@@ -24,13 +24,12 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
-import org.apache.rya.indexing.pcj.fluo.app.StringTypeLayer;
import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
import org.apache.fluo.api.client.FluoClient;
+import org.apache.fluo.api.client.Snapshot;
import org.apache.fluo.api.client.scanner.CellScanner;
import org.apache.fluo.api.data.RowColumnValue;
-import org.apache.fluo.recipes.core.types.TypedSnapshot;
/**
* Finds all queries that are being managed by this instance of Fluo that
@@ -51,7 +50,7 @@ public class ListQueryIds {
final List<String> queryIds = new ArrayList<>();
- try(TypedSnapshot snap = new StringTypeLayer().wrap( fluo.newSnapshot() )) {
+ try(Snapshot snap = fluo.newSnapshot() ) {
// Create an iterator that iterates over the QUERY_ID column.
final CellScanner cellScanner = snap.scanner().fetch( FluoQueryColumns.QUERY_ID).build();
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/177c80a1/extras/rya.pcj.fluo/pcj.fluo.app/pom.xml
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/pom.xml b/extras/rya.pcj.fluo/pcj.fluo.app/pom.xml
index 45ea9ce..f756cdb 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/pom.xml
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/pom.xml
@@ -56,10 +56,6 @@ under the License.
<artifactId>fluo-api</artifactId>
</dependency>
<dependency>
- <groupId>org.apache.fluo</groupId>
- <artifactId>fluo-recipes-core</artifactId>
- </dependency>
- <dependency>
<groupId>org.apache.fluo</groupId>
<artifactId>fluo-core</artifactId>
<exclusions>
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/177c80a1/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/FilterResultUpdater.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/FilterResultUpdater.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/FilterResultUpdater.java
index 328c653..b1af4fc 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/FilterResultUpdater.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/FilterResultUpdater.java
@@ -52,8 +52,6 @@ import info.aduna.iteration.CloseableIteration;
import org.apache.fluo.api.client.TransactionBase;
import org.apache.fluo.api.data.Bytes;
import org.apache.fluo.api.data.Column;
-import org.apache.fluo.recipes.core.types.Encoder;
-import org.apache.fluo.recipes.core.types.StringEncoder;
/**
* Updates the results of a Filter node when its child has added a new Binding
@@ -62,8 +60,6 @@ import org.apache.fluo.recipes.core.types.StringEncoder;
@ParametersAreNonnullByDefault
public class FilterResultUpdater {
- private static final Encoder ENCODER = new StringEncoder();
-
private static final BindingSetStringConverter ID_CONVERTER = new BindingSetStringConverter();
private static final VisibilityBindingSetStringConverter VALUE_CONVERTER = new VisibilityBindingSetStringConverter();
@@ -134,9 +130,9 @@ public class FilterResultUpdater {
String filterBindingSetValueString = "";
filterBindingSetValueString = VALUE_CONVERTER.convert(childBindingSet, filterVarOrder);
- final Bytes row = ENCODER.encode( filterMetadata.getNodeId() + NODEID_BS_DELIM + filterBindingSetIdString );
+ final String row = filterMetadata.getNodeId() + NODEID_BS_DELIM + filterBindingSetIdString;
final Column col = FluoQueryColumns.FILTER_BINDING_SET;
- final Bytes value = ENCODER.encode(filterBindingSetValueString);
+ final String value = filterBindingSetValueString;
tx.set(row, col, value);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/177c80a1/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/IncUpdateDAO.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/IncUpdateDAO.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/IncUpdateDAO.java
index 7ccfeff..513ab40 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/IncUpdateDAO.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/IncUpdateDAO.java
@@ -25,11 +25,11 @@ import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.UR
import org.apache.fluo.api.client.FluoClient;
import org.apache.fluo.api.client.Snapshot;
+import org.apache.fluo.api.client.Transaction;
import org.apache.fluo.api.client.scanner.CellScanner;
import org.apache.fluo.api.data.Bytes;
import org.apache.fluo.api.data.Column;
import org.apache.fluo.api.data.RowColumnValue;
-import org.apache.fluo.recipes.core.types.TypedTransaction;
import mvm.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT;
import mvm.rya.api.domain.RyaStatement;
import mvm.rya.api.resolver.triple.TripleRow;
@@ -38,7 +38,6 @@ import mvm.rya.api.resolver.triple.impl.WholeRowTripleResolver;
public class IncUpdateDAO {
- private static final StringTypeLayer stl = new StringTypeLayer();
private static final WholeRowTripleResolver tr = new WholeRowTripleResolver();
public static RyaStatement deserializeTriple(final Bytes row) {
@@ -77,7 +76,7 @@ public class IncUpdateDAO {
*/
public static void addRow(final FluoClient fluoClient, final String row, final Column col, final String val) {
checkNotNull(fluoClient);
- try (TypedTransaction tx = stl.wrap(fluoClient.newTransaction())) {
+ try (Transaction tx = fluoClient.newTransaction()) {
addRow(tx, row, col, val);
tx.commit();
}
@@ -91,9 +90,9 @@ public class IncUpdateDAO {
* @param col - The Column.
* @param val - The value.
*/
- public static void addRow(final TypedTransaction tx, final String row, final Column col, final String val) {
+ public static void addRow(final Transaction tx, final String row, final Column col, final String val) {
checkNotNull(tx);
- tx.mutate().row(row).col(col).set(val);
+ tx.set(row, col, val);
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/177c80a1/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/JoinResultUpdater.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/JoinResultUpdater.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/JoinResultUpdater.java
index 5ac69b0..73a03ca 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/JoinResultUpdater.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/JoinResultUpdater.java
@@ -49,12 +49,9 @@ import com.google.common.collect.Sets;
import org.apache.fluo.api.client.TransactionBase;
import org.apache.fluo.api.client.scanner.ColumnScanner;
import org.apache.fluo.api.client.scanner.RowScanner;
-import org.apache.fluo.api.data.Bytes;
import org.apache.fluo.api.data.Column;
import org.apache.fluo.api.data.ColumnValue;
import org.apache.fluo.api.data.Span;
-import org.apache.fluo.recipes.core.types.Encoder;
-import org.apache.fluo.recipes.core.types.StringEncoder;
/**
* Updates the results of a Join node when one of its children has added a
@@ -67,8 +64,7 @@ public class JoinResultUpdater {
private static final VisibilityBindingSetStringConverter valueConverter = new VisibilityBindingSetStringConverter();
private final FluoQueryMetadataDAO queryDao = new FluoQueryMetadataDAO();
- private final Encoder encoder = new StringEncoder();
-
+
/**
* Updates the results of a Join node when one of its children has added a
* new Binding Set to its results.
@@ -132,9 +128,9 @@ public class JoinResultUpdater {
final String joinBindingSetStringId = idConverter.convert(newJoinResult, joinVarOrder);
final String joinBindingSetStringValue = valueConverter.convert(newJoinResult, joinVarOrder);
- final Bytes row = encoder.encode(joinMetadata.getNodeId() + NODEID_BS_DELIM + joinBindingSetStringId);
+ final String row = joinMetadata.getNodeId() + NODEID_BS_DELIM + joinBindingSetStringId;
final Column col = FluoQueryColumns.JOIN_BINDING_SET;
- final Bytes value = encoder.encode(joinBindingSetStringValue);
+ final String value = joinBindingSetStringValue;
tx.set(row, col, value);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/177c80a1/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/QueryResultUpdater.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/QueryResultUpdater.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/QueryResultUpdater.java
index 41f9025..b4800fc 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/QueryResultUpdater.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/QueryResultUpdater.java
@@ -33,10 +33,7 @@ import org.openrdf.query.Binding;
import org.openrdf.query.impl.MapBindingSet;
import org.apache.fluo.api.client.TransactionBase;
-import org.apache.fluo.api.data.Bytes;
import org.apache.fluo.api.data.Column;
-import org.apache.fluo.recipes.core.types.Encoder;
-import org.apache.fluo.recipes.core.types.StringEncoder;
/**
* Updates the results of a Query node when one of its children has added a
@@ -44,8 +41,7 @@ import org.apache.fluo.recipes.core.types.StringEncoder;
*/
@ParametersAreNonnullByDefault
public class QueryResultUpdater {
- private final Encoder encoder = new StringEncoder();
-
+
private final BindingSetStringConverter converter = new BindingSetStringConverter();
private final VisibilityBindingSetStringConverter valueConverter = new VisibilityBindingSetStringConverter();
@@ -79,9 +75,9 @@ public class QueryResultUpdater {
final String queryBindingSetValueString = valueConverter.convert(new VisibilityBindingSet(queryBindingSet, childBindingSet.getVisibility()), queryVarOrder);
// Commit it to the Fluo table for the SPARQL query. This isn't guaranteed to be a new entry.
- final Bytes row = encoder.encode(queryMetadata.getNodeId() + NODEID_BS_DELIM + queryBindingSetString);
+ final String row = queryMetadata.getNodeId() + NODEID_BS_DELIM + queryBindingSetString;
final Column col = FluoQueryColumns.QUERY_BINDING_SET;
- final Bytes value = encoder.encode(queryBindingSetValueString);
+ final String value = queryBindingSetValueString;
tx.set(row, col, value);
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/177c80a1/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/StringTypeLayer.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/StringTypeLayer.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/StringTypeLayer.java
deleted file mode 100644
index aecb434..0000000
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/StringTypeLayer.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.rya.indexing.pcj.fluo.app;
-
-import org.apache.fluo.recipes.core.types.StringEncoder;
-import org.apache.fluo.recipes.core.types.TypeLayer;
-
-public class StringTypeLayer extends TypeLayer {
-
- public StringTypeLayer() {
- super(new StringEncoder());
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/177c80a1/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalResultExporter.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalResultExporter.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalResultExporter.java
index a7f016d..ecc39bd 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalResultExporter.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalResultExporter.java
@@ -20,9 +20,9 @@ package org.apache.rya.indexing.pcj.fluo.app.export;
import javax.annotation.ParametersAreNonnullByDefault;
+import org.apache.fluo.api.client.TransactionBase;
import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
-import org.apache.fluo.recipes.core.types.TypedTransactionBase;
/**
* Exports a single Binding Set that is a new result for a SPARQL query to some
@@ -40,7 +40,7 @@ public interface IncrementalResultExporter {
* Fluo application. (not null)
* @throws ResultExportException The result could not be exported.
*/
- public void export(TypedTransactionBase tx, String queryId, VisibilityBindingSet result) throws ResultExportException;
+ public void export(TransactionBase tx, String queryId, VisibilityBindingSet result) throws ResultExportException;
/**
* A result could not be exported.
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/177c80a1/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaResultExporter.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaResultExporter.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaResultExporter.java
index 27530f0..a4b589f 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaResultExporter.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaResultExporter.java
@@ -27,9 +27,8 @@ import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.PCJStorageException;
import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
-
+import org.apache.fluo.api.client.TransactionBase;
import org.apache.fluo.api.data.Bytes;
-import org.apache.fluo.recipes.core.types.TypedTransactionBase;
/**
* Incrementally exports SPARQL query results to Accumulo PCJ tables as they are defined by Rya.
@@ -49,14 +48,14 @@ public class RyaResultExporter implements IncrementalResultExporter {
@Override
public void export(
- final TypedTransactionBase fluoTx,
+ final TransactionBase fluoTx,
final String queryId,
final VisibilityBindingSet result) throws ResultExportException {
checkNotNull(fluoTx);
checkNotNull(queryId);
checkNotNull(result);
- final String pcjId = fluoTx.get(Bytes.of(queryId), FluoQueryColumns.RYA_PCJ_ID).toString();
+ final String pcjId = fluoTx.gets(queryId, FluoQueryColumns.RYA_PCJ_ID);
try {
pcjStorage.addResults(pcjId, Collections.singleton(result));
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/177c80a1/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/BindingSetUpdater.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/BindingSetUpdater.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/BindingSetUpdater.java
index e344b0a..a2953cf 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/BindingSetUpdater.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/BindingSetUpdater.java
@@ -37,10 +37,7 @@ import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
import org.apache.fluo.api.client.TransactionBase;
import org.apache.fluo.api.data.Bytes;
import org.apache.fluo.api.data.Column;
-import org.apache.fluo.recipes.core.types.Encoder;
-import org.apache.fluo.recipes.core.types.StringEncoder;
-import org.apache.fluo.recipes.core.types.TypedObserver;
-import org.apache.fluo.recipes.core.types.TypedTransactionBase;
+import org.apache.fluo.api.observer.AbstractObserver;
/**
* Notified when the results of a node have been updated to include a new Binding
@@ -48,9 +45,8 @@ import org.apache.fluo.recipes.core.types.TypedTransactionBase;
* results.
*/
@ParametersAreNonnullByDefault
-public abstract class BindingSetUpdater extends TypedObserver {
+public abstract class BindingSetUpdater extends AbstractObserver {
- private final Encoder encoder = new StringEncoder();
// DAO
private final FluoQueryMetadataDAO queryDao = new FluoQueryMetadataDAO();
@@ -72,12 +68,12 @@ public abstract class BindingSetUpdater extends TypedObserver {
public abstract Observation parseObservation(TransactionBase tx, final BindingSetRow parsedRow);
@Override
- public final void process(final TypedTransactionBase tx, final Bytes row, final Column col) {
+ public final void process(final TransactionBase tx, final Bytes row, final Column col) {
checkNotNull(tx);
checkNotNull(row);
checkNotNull(col);
- final String bindingSetString = encoder.decodeString(tx.get(row, col));
+ final String bindingSetString = tx.get(row, col).toString();
final Observation observation = parseObservation( tx, new BindingSetRow(BindingSetRow.make(row).getNodeId(), bindingSetString) );
final String observedNodeId = observation.getObservedNodeId();
final VisibilityBindingSet observedBindingSet = observation.getObservedBindingSet();
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/177c80a1/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java
index 638b1fc..0944f9b 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java
@@ -38,23 +38,19 @@ import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSetStringCo
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableSet;
-
+import org.apache.fluo.api.client.TransactionBase;
import org.apache.fluo.api.data.Bytes;
import org.apache.fluo.api.data.Column;
-import org.apache.fluo.recipes.core.types.Encoder;
-import org.apache.fluo.recipes.core.types.StringEncoder;
-import org.apache.fluo.recipes.core.types.TypedObserver;
-import org.apache.fluo.recipes.core.types.TypedTransactionBase;
+import org.apache.fluo.api.observer.AbstractObserver;
import mvm.rya.accumulo.utils.VisibilitySimplifier;
/**
* Performs incremental result exporting to the configured destinations.
*/
-public class QueryResultObserver extends TypedObserver {
+public class QueryResultObserver extends AbstractObserver {
private static final Logger log = Logger.getLogger(QueryResultObserver.class);
private static final FluoQueryMetadataDAO QUERY_DAO = new FluoQueryMetadataDAO();
- private static final Encoder ENCODER = new StringEncoder();
private static final VisibilityBindingSetStringConverter CONVERTER = new VisibilityBindingSetStringConverter();
/**
@@ -107,11 +103,13 @@ public class QueryResultObserver extends TypedObserver {
}
@Override
- public void process(final TypedTransactionBase tx, final Bytes row, final Column col) {
+ public void process(final TransactionBase tx, final Bytes brow, final Column col) {
+ final String row = brow.toString();
+
// Read the SPARQL query and it Binding Set from the row id.
- final String[] queryAndBindingSet = ENCODER.decodeString(row).split(NODEID_BS_DELIM);
+ final String[] queryAndBindingSet = row.split(NODEID_BS_DELIM);
final String queryId = queryAndBindingSet[0];
- final String bindingSetString = ENCODER.decodeString(tx.get(row, col));
+ final String bindingSetString = tx.gets(row, col);
// Fetch the query's Variable Order from the Fluo table.
final QueryMetadata queryMetadata = QUERY_DAO.readQueryMetadata(tx, queryId);
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/177c80a1/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/TripleObserver.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/TripleObserver.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/TripleObserver.java
index 31a4c29..70f1cbd 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/TripleObserver.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/TripleObserver.java
@@ -26,7 +26,6 @@ import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.VA
import java.util.Map;
import org.apache.rya.indexing.pcj.fluo.app.IncUpdateDAO;
-import org.apache.rya.indexing.pcj.fluo.app.StringTypeLayer;
import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO;
import org.apache.rya.indexing.pcj.fluo.app.query.StatementPatternMetadata;
@@ -35,32 +34,26 @@ import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSetStringConverter;
import com.google.common.collect.Maps;
-
+import org.apache.fluo.api.client.TransactionBase;
import org.apache.fluo.api.client.scanner.ColumnScanner;
import org.apache.fluo.api.client.scanner.RowScanner;
import org.apache.fluo.api.data.Bytes;
import org.apache.fluo.api.data.Column;
import org.apache.fluo.api.data.ColumnValue;
import org.apache.fluo.api.data.Span;
-import org.apache.fluo.recipes.core.types.Encoder;
-import org.apache.fluo.recipes.core.types.StringEncoder;
-import org.apache.fluo.recipes.core.types.TypedObserver;
-import org.apache.fluo.recipes.core.types.TypedTransactionBase;
+import org.apache.fluo.api.observer.AbstractObserver;
/**
* An observer that matches new Triples to the Statement Patterns that are part
* of any PCJ that is being maintained. If the triple matches a pattern, then
* the new result is stored as a binding set for the pattern.
*/
-public class TripleObserver extends TypedObserver {
+public class TripleObserver extends AbstractObserver {
- private static final Encoder ENCODER = new StringEncoder();
private static final FluoQueryMetadataDAO QUERY_DAO = new FluoQueryMetadataDAO();
private static final VisibilityBindingSetStringConverter CONVERTER = new VisibilityBindingSetStringConverter();
- public TripleObserver() {
- super(new StringTypeLayer());
- }
+ public TripleObserver() {}
@Override
public ObservedColumn getObservedColumn() {
@@ -68,15 +61,12 @@ public class TripleObserver extends TypedObserver {
}
@Override
- public void process(final TypedTransactionBase tx, final Bytes row, final Column column) {
+ public void process(final TransactionBase tx, final Bytes brow, final Column column) {
//get string representation of triple
- final String triple = IncUpdateDAO.getTripleString(row);
- final Bytes visiBytes = tx.get(row, FluoQueryColumns.TRIPLES);
- String visibility = "";
- if(visiBytes != null) {
- visibility = ENCODER.decodeString(visiBytes);
- }
-
+ String row = brow.toString();
+ final String triple = IncUpdateDAO.getTripleString(brow);
+ String visibility = tx.gets(row, FluoQueryColumns.TRIPLES, "");
+
//get variable metadata for all SP in table
RowScanner rscanner = tx.scanner().over(Span.prefix(SP_PREFIX)).fetch(FluoQueryColumns.STATEMENT_PATTERN_VARIABLE_ORDER).byRow().build();
@@ -100,7 +90,7 @@ public class TripleObserver extends TypedObserver {
CONVERTER.convert(bindingSetString, varOrder),
visibility);
final String valueString = CONVERTER.convert(bindingSet, varOrder);
- tx.mutate().row(spID + NODEID_BS_DELIM + bindingSetString).col(FluoQueryColumns.STATEMENT_PATTERN_BINDING_SET).set(valueString);
+ tx.set(spID + NODEID_BS_DELIM + bindingSetString, FluoQueryColumns.STATEMENT_PATTERN_BINDING_SET, valueString);
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/177c80a1/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java
index a955a53..8d41c61 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java
@@ -34,8 +34,6 @@ import org.apache.fluo.api.client.SnapshotBase;
import org.apache.fluo.api.client.TransactionBase;
import org.apache.fluo.api.data.Bytes;
import org.apache.fluo.api.data.Column;
-import org.apache.fluo.recipes.core.types.Encoder;
-import org.apache.fluo.recipes.core.types.StringEncoder;
/**
* Reads and writes {@link FluoQuery} instances and their components to/from
@@ -44,8 +42,6 @@ import org.apache.fluo.recipes.core.types.StringEncoder;
@ParametersAreNonnullByDefault
public class FluoQueryMetadataDAO {
- private static final Encoder encoder = new StringEncoder();
-
/**
* Write an instance of {@link QueryMetadata} to the Fluo table.
*
@@ -56,11 +52,11 @@ public class FluoQueryMetadataDAO {
checkNotNull(tx);
checkNotNull(metadata);
- final Bytes rowId = encoder.encode(metadata.getNodeId());
+ final String rowId = metadata.getNodeId();
tx.set(rowId, FluoQueryColumns.QUERY_NODE_ID, rowId);
- tx.set(rowId, FluoQueryColumns.QUERY_VARIABLE_ORDER, encoder.encode( metadata.getVariableOrder().toString() ));
- tx.set(rowId, FluoQueryColumns.QUERY_SPARQL, encoder.encode( metadata.getSparql() ));
- tx.set(rowId, FluoQueryColumns.QUERY_CHILD_NODE_ID, encoder.encode( metadata.getChildNodeId() ));
+ tx.set(rowId, FluoQueryColumns.QUERY_VARIABLE_ORDER, metadata.getVariableOrder().toString());
+ tx.set(rowId, FluoQueryColumns.QUERY_SPARQL, metadata.getSparql() );
+ tx.set(rowId, FluoQueryColumns.QUERY_CHILD_NODE_ID, metadata.getChildNodeId() );
}
/**
@@ -79,18 +75,18 @@ public class FluoQueryMetadataDAO {
checkNotNull(nodeId);
// Fetch the values from the Fluo table.
- final Bytes rowId = encoder.encode(nodeId);
- final Map<Column, Bytes> values = sx.get(rowId, Sets.newHashSet(
+ final String rowId = nodeId;
+ final Map<Column, String> values = sx.gets(rowId,
FluoQueryColumns.QUERY_VARIABLE_ORDER,
FluoQueryColumns.QUERY_SPARQL,
- FluoQueryColumns.QUERY_CHILD_NODE_ID));
+ FluoQueryColumns.QUERY_CHILD_NODE_ID);
// Return an object holding them.
- final String varOrderString = encoder.decodeString( values.get(FluoQueryColumns.QUERY_VARIABLE_ORDER));
+ final String varOrderString = values.get(FluoQueryColumns.QUERY_VARIABLE_ORDER);
final VariableOrder varOrder = new VariableOrder(varOrderString);
- final String sparql = encoder.decodeString( values.get(FluoQueryColumns.QUERY_SPARQL) );
- final String childNodeId = encoder.decodeString( values.get(FluoQueryColumns.QUERY_CHILD_NODE_ID) );
+ final String sparql = values.get(FluoQueryColumns.QUERY_SPARQL);
+ final String childNodeId = values.get(FluoQueryColumns.QUERY_CHILD_NODE_ID);
return QueryMetadata.builder(nodeId)
.setVariableOrder( varOrder )
@@ -108,13 +104,13 @@ public class FluoQueryMetadataDAO {
checkNotNull(tx);
checkNotNull(metadata);
- final Bytes rowId = encoder.encode(metadata.getNodeId());
+ final String rowId = metadata.getNodeId();
tx.set(rowId, FluoQueryColumns.FILTER_NODE_ID, rowId);
- tx.set(rowId, FluoQueryColumns.FILTER_VARIABLE_ORDER, encoder.encode( metadata.getVariableOrder().toString() ));
- tx.set(rowId, FluoQueryColumns.FILTER_ORIGINAL_SPARQL, encoder.encode( metadata.getOriginalSparql() ));
- tx.set(rowId, FluoQueryColumns.FILTER_INDEX_WITHIN_SPARQL, encoder.encode( metadata.getFilterIndexWithinSparql() ));
- tx.set(rowId, FluoQueryColumns.FILTER_PARENT_NODE_ID, encoder.encode( metadata.getParentNodeId() ));
- tx.set(rowId, FluoQueryColumns.FILTER_CHILD_NODE_ID, encoder.encode( metadata.getChildNodeId() ));
+ tx.set(rowId, FluoQueryColumns.FILTER_VARIABLE_ORDER, metadata.getVariableOrder().toString());
+ tx.set(rowId, FluoQueryColumns.FILTER_ORIGINAL_SPARQL, metadata.getOriginalSparql() );
+ tx.set(rowId, FluoQueryColumns.FILTER_INDEX_WITHIN_SPARQL, metadata.getFilterIndexWithinSparql()+"" );
+ tx.set(rowId, FluoQueryColumns.FILTER_PARENT_NODE_ID, metadata.getParentNodeId() );
+ tx.set(rowId, FluoQueryColumns.FILTER_CHILD_NODE_ID, metadata.getChildNodeId() );
}
/**
@@ -133,22 +129,22 @@ public class FluoQueryMetadataDAO {
checkNotNull(nodeId);
// Fetch the values from the Fluo table.
- final Bytes rowId = encoder.encode(nodeId);
- final Map<Column, Bytes> values = sx.get(rowId, Sets.newHashSet(
+ final String rowId = nodeId;
+ final Map<Column, String> values = sx.gets(rowId,
FluoQueryColumns.FILTER_VARIABLE_ORDER,
FluoQueryColumns.FILTER_ORIGINAL_SPARQL,
FluoQueryColumns.FILTER_INDEX_WITHIN_SPARQL,
FluoQueryColumns.FILTER_PARENT_NODE_ID,
- FluoQueryColumns.FILTER_CHILD_NODE_ID));
+ FluoQueryColumns.FILTER_CHILD_NODE_ID);
// Return an object holding them.
- final String varOrderString = encoder.decodeString( values.get(FluoQueryColumns.FILTER_VARIABLE_ORDER));
+ final String varOrderString = values.get(FluoQueryColumns.FILTER_VARIABLE_ORDER);
final VariableOrder varOrder = new VariableOrder(varOrderString);
- final String originalSparql = encoder.decodeString( values.get(FluoQueryColumns.FILTER_ORIGINAL_SPARQL) );
- final int filterIndexWithinSparql = encoder.decodeInteger( values.get(FluoQueryColumns.FILTER_INDEX_WITHIN_SPARQL) );
- final String parentNodeId = encoder.decodeString( values.get(FluoQueryColumns.FILTER_PARENT_NODE_ID) );
- final String childNodeId = encoder.decodeString( values.get(FluoQueryColumns.FILTER_CHILD_NODE_ID) );
+ final String originalSparql = values.get(FluoQueryColumns.FILTER_ORIGINAL_SPARQL);
+ final int filterIndexWithinSparql = Integer.parseInt(values.get(FluoQueryColumns.FILTER_INDEX_WITHIN_SPARQL));
+ final String parentNodeId = values.get(FluoQueryColumns.FILTER_PARENT_NODE_ID);
+ final String childNodeId = values.get(FluoQueryColumns.FILTER_CHILD_NODE_ID);
return FilterMetadata.builder(nodeId)
.setVarOrder(varOrder)
@@ -168,13 +164,13 @@ public class FluoQueryMetadataDAO {
checkNotNull(tx);
checkNotNull(metadata);
- final Bytes rowId = encoder.encode(metadata.getNodeId());
+ final String rowId = metadata.getNodeId();
tx.set(rowId, FluoQueryColumns.JOIN_NODE_ID, rowId);
- tx.set(rowId, FluoQueryColumns.JOIN_VARIABLE_ORDER, encoder.encode( metadata.getVariableOrder().toString() ));
- tx.set(rowId, FluoQueryColumns.JOIN_TYPE, encoder.encode(metadata.getJoinType().toString()) );
- tx.set(rowId, FluoQueryColumns.JOIN_PARENT_NODE_ID, encoder.encode( metadata.getParentNodeId() ));
- tx.set(rowId, FluoQueryColumns.JOIN_LEFT_CHILD_NODE_ID, encoder.encode( metadata.getLeftChildNodeId() ));
- tx.set(rowId, FluoQueryColumns.JOIN_RIGHT_CHILD_NODE_ID, encoder.encode( metadata.getRightChildNodeId() ));
+ tx.set(rowId, FluoQueryColumns.JOIN_VARIABLE_ORDER, metadata.getVariableOrder().toString());
+ tx.set(rowId, FluoQueryColumns.JOIN_TYPE, metadata.getJoinType().toString() );
+ tx.set(rowId, FluoQueryColumns.JOIN_PARENT_NODE_ID, metadata.getParentNodeId() );
+ tx.set(rowId, FluoQueryColumns.JOIN_LEFT_CHILD_NODE_ID, metadata.getLeftChildNodeId() );
+ tx.set(rowId, FluoQueryColumns.JOIN_RIGHT_CHILD_NODE_ID, metadata.getRightChildNodeId() );
}
/**
@@ -193,24 +189,24 @@ public class FluoQueryMetadataDAO {
checkNotNull(nodeId);
// Fetch the values from the Fluo table.
- final Bytes rowId = encoder.encode(nodeId);
- final Map<Column, Bytes> values = sx.get(rowId, Sets.newHashSet(
+ final String rowId = nodeId;
+ final Map<Column, String> values = sx.gets(rowId,
FluoQueryColumns.JOIN_VARIABLE_ORDER,
FluoQueryColumns.JOIN_TYPE,
FluoQueryColumns.JOIN_PARENT_NODE_ID,
FluoQueryColumns.JOIN_LEFT_CHILD_NODE_ID,
- FluoQueryColumns.JOIN_RIGHT_CHILD_NODE_ID));
+ FluoQueryColumns.JOIN_RIGHT_CHILD_NODE_ID);
// Return an object holding them.
- final String varOrderString = encoder.decodeString( values.get(FluoQueryColumns.JOIN_VARIABLE_ORDER));
+ final String varOrderString = values.get(FluoQueryColumns.JOIN_VARIABLE_ORDER);
final VariableOrder varOrder = new VariableOrder(varOrderString);
- final String joinTypeString = encoder.decodeString( values.get(FluoQueryColumns.JOIN_TYPE) );
+ final String joinTypeString = values.get(FluoQueryColumns.JOIN_TYPE);
final JoinType joinType = JoinType.valueOf(joinTypeString);
- final String parentNodeId = encoder.decodeString( values.get(FluoQueryColumns.JOIN_PARENT_NODE_ID) );
- final String leftChildNodeId = encoder.decodeString( values.get(FluoQueryColumns.JOIN_LEFT_CHILD_NODE_ID) );
- final String rightChildNodeId = encoder.decodeString( values.get(FluoQueryColumns.JOIN_RIGHT_CHILD_NODE_ID) );
+ final String parentNodeId = values.get(FluoQueryColumns.JOIN_PARENT_NODE_ID);
+ final String leftChildNodeId = values.get(FluoQueryColumns.JOIN_LEFT_CHILD_NODE_ID);
+ final String rightChildNodeId = values.get(FluoQueryColumns.JOIN_RIGHT_CHILD_NODE_ID);
return JoinMetadata.builder(nodeId)
.setVariableOrder(varOrder)
@@ -230,11 +226,11 @@ public class FluoQueryMetadataDAO {
checkNotNull(tx);
checkNotNull(metadata);
- final Bytes rowId = encoder.encode(metadata.getNodeId());
+ final String rowId = metadata.getNodeId();
tx.set(rowId, FluoQueryColumns.STATEMENT_PATTERN_NODE_ID, rowId);
- tx.set(rowId, FluoQueryColumns.STATEMENT_PATTERN_VARIABLE_ORDER, encoder.encode( metadata.getVariableOrder().toString() ));
- tx.set(rowId, FluoQueryColumns.STATEMENT_PATTERN_PATTERN, encoder.encode( metadata.getStatementPattern() ));
- tx.set(rowId, FluoQueryColumns.STATEMENT_PATTERN_PARENT_NODE_ID, encoder.encode( metadata.getParentNodeId() ));
+ tx.set(rowId, FluoQueryColumns.STATEMENT_PATTERN_VARIABLE_ORDER, metadata.getVariableOrder().toString());
+ tx.set(rowId, FluoQueryColumns.STATEMENT_PATTERN_PATTERN, metadata.getStatementPattern() );
+ tx.set(rowId, FluoQueryColumns.STATEMENT_PATTERN_PARENT_NODE_ID, metadata.getParentNodeId());
}
/**
@@ -253,18 +249,18 @@ public class FluoQueryMetadataDAO {
checkNotNull(nodeId);
// Fetch the values from the Fluo table.
- final Bytes rowId = encoder.encode(nodeId);
- final Map<Column, Bytes> values = sx.get(rowId, Sets.newHashSet(
+ final String rowId = nodeId;
+ final Map<Column, String> values = sx.gets(rowId,
FluoQueryColumns.STATEMENT_PATTERN_VARIABLE_ORDER,
FluoQueryColumns.STATEMENT_PATTERN_PATTERN,
- FluoQueryColumns.STATEMENT_PATTERN_PARENT_NODE_ID));
+ FluoQueryColumns.STATEMENT_PATTERN_PARENT_NODE_ID);
// Return an object holding them.
- final String varOrderString = encoder.decodeString( values.get(FluoQueryColumns.STATEMENT_PATTERN_VARIABLE_ORDER));
+ final String varOrderString = values.get(FluoQueryColumns.STATEMENT_PATTERN_VARIABLE_ORDER);
final VariableOrder varOrder = new VariableOrder(varOrderString);
- final String pattern = encoder.decodeString( values.get(FluoQueryColumns.STATEMENT_PATTERN_PATTERN) );
- final String parentNodeId = encoder.decodeString( values.get(FluoQueryColumns.STATEMENT_PATTERN_PARENT_NODE_ID) );
+ final String pattern = values.get(FluoQueryColumns.STATEMENT_PATTERN_PATTERN);
+ final String parentNodeId = values.get(FluoQueryColumns.STATEMENT_PATTERN_PARENT_NODE_ID);
return StatementPatternMetadata.builder(nodeId)
.setVarOrder(varOrder)
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/177c80a1/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/ListQueryIdsIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/ListQueryIdsIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/ListQueryIdsIT.java
index 99e2191..19bc272 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/ListQueryIdsIT.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/ListQueryIdsIT.java
@@ -27,12 +27,11 @@ import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.TableExistsException;
import org.apache.rya.indexing.pcj.fluo.ITBase;
-import org.apache.rya.indexing.pcj.fluo.app.StringTypeLayer;
import org.junit.Test;
import com.beust.jcommander.internal.Lists;
-import org.apache.fluo.recipes.core.types.TypedTransaction;
+import org.apache.fluo.api.client.Transaction;
/**
* Integration tests the methods of {@link ListQueryIds}.
@@ -47,11 +46,11 @@ public class ListQueryIdsIT extends ITBase {
@Test
public void getQueryIds() throws AccumuloException, AccumuloSecurityException, TableExistsException {
// Store a few SPARQL/Query ID pairs in the Fluo table.
- try(TypedTransaction tx = new StringTypeLayer().wrap( fluoClient.newTransaction() )) {
- tx.mutate().row("SPARQL_3").col(QUERY_ID).set("ID_3");
- tx.mutate().row("SPARQL_1").col(QUERY_ID).set("ID_1");
- tx.mutate().row("SPARQL_4").col(QUERY_ID).set("ID_4");
- tx.mutate().row("SPARQL_2").col(QUERY_ID).set("ID_2");
+ try(Transaction tx = fluoClient.newTransaction()) {
+ tx.set("SPARQL_3", QUERY_ID, "ID_3");
+ tx.set("SPARQL_1", QUERY_ID, "ID_1");
+ tx.set("SPARQL_4", QUERY_ID, "ID_4");
+ tx.set("SPARQL_2", QUERY_ID, "ID_2");
tx.commit();
}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/177c80a1/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 6e280c8..6ca4505 100644
--- a/pom.xml
+++ b/pom.xml
@@ -120,7 +120,6 @@ under the License.
<maven.min-version>3.0.4</maven.min-version>
<fluo.version>1.0.0-incubating</fluo.version>
- <fluo-recipes.version>1.0.0-incubating-SNAPSHOT</fluo-recipes.version>
<jmh.version>1.13</jmh.version>
<jsr305.version>3.0.1</jsr305.version>
@@ -524,11 +523,6 @@ under the License.
<artifactId>fluo-mini</artifactId>
<version>${fluo.version}</version>
</dependency>
- <dependency>
- <groupId>org.apache.fluo</groupId>
- <artifactId>fluo-recipes-core</artifactId>
- <version>${fluo-recipes.version}</version>
- </dependency>
<dependency>
<groupId>org.mockito</groupId>