You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@marmotta.apache.org by ss...@apache.org on 2016/02/13 16:13:10 UTC

marmotta git commit: [MARMOTTA-621] implement Java client side support for direct ASK and CONSTRUCT queries

Repository: marmotta
Updated Branches:
  refs/heads/develop 4a5e588cd -> 4ab20b3d8


[MARMOTTA-621] implement Java client side support for direct ASK and CONSTRUCT queries


Project: http://git-wip-us.apache.org/repos/asf/marmotta/repo
Commit: http://git-wip-us.apache.org/repos/asf/marmotta/commit/4ab20b3d
Tree: http://git-wip-us.apache.org/repos/asf/marmotta/tree/4ab20b3d
Diff: http://git-wip-us.apache.org/repos/asf/marmotta/diff/4ab20b3d

Branch: refs/heads/develop
Commit: 4ab20b3d874ff1062d4ddb5fb52790d7fe7348a8
Parents: 4a5e588
Author: Sebastian Schaffert <ss...@apache.org>
Authored: Sat Feb 13 16:14:03 2016 +0100
Committer: Sebastian Schaffert <ss...@apache.org>
Committed: Sat Feb 13 16:14:03 2016 +0100

----------------------------------------------------------------------
 .../ostrich/sail/OstrichSailConnection.java     | 97 ++++++++++++++++++--
 .../backend/ostrich/OstrichSailRepository.java  | 72 +++++++++++++--
 2 files changed, 153 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/marmotta/blob/4ab20b3d/libraries/ostrich/client/src/main/java/org/apache/marmotta/ostrich/sail/OstrichSailConnection.java
----------------------------------------------------------------------
diff --git a/libraries/ostrich/client/src/main/java/org/apache/marmotta/ostrich/sail/OstrichSailConnection.java b/libraries/ostrich/client/src/main/java/org/apache/marmotta/ostrich/sail/OstrichSailConnection.java
index e85fdd2..b8f6b27 100644
--- a/libraries/ostrich/client/src/main/java/org/apache/marmotta/ostrich/sail/OstrichSailConnection.java
+++ b/libraries/ostrich/client/src/main/java/org/apache/marmotta/ostrich/sail/OstrichSailConnection.java
@@ -24,6 +24,7 @@ import info.aduna.iteration.*;
 import io.grpc.ManagedChannel;
 import io.grpc.ManagedChannelBuilder;
 import io.grpc.Status;
+import io.grpc.StatusRuntimeException;
 import io.grpc.stub.StreamObserver;
 import org.apache.marmotta.ostrich.client.proto.Sail;
 import org.apache.marmotta.ostrich.client.proto.SailServiceGrpc;
@@ -67,7 +68,8 @@ public class OstrichSailConnection extends NotifyingSailConnectionBase {
     private static Logger log = LoggerFactory.getLogger(OstrichSailConnection.class);
 
     private final ManagedChannel channel;
-    private final SailServiceGrpc.SailServiceBlockingStub stub;
+    private final SailServiceGrpc.SailServiceBlockingStub blockingSailStub;
+    private final SparqlServiceGrpc.SparqlServiceBlockingStub blockingSparqlStub;
     private final SailServiceGrpc.SailServiceStub sailServiceStub;
     private final SparqlServiceGrpc.SparqlServiceStub sparqlServiceStub;
 
@@ -80,9 +82,10 @@ public class OstrichSailConnection extends NotifyingSailConnectionBase {
         channel = ManagedChannelBuilder.forAddress(host, port)
                 .usePlaintext(true)
                 .build();
-        stub = SailServiceGrpc.newBlockingStub(channel);
+        blockingSailStub = SailServiceGrpc.newBlockingStub(channel);
         sailServiceStub = SailServiceGrpc.newStub(channel);
         sparqlServiceStub = SparqlServiceGrpc.newStub(channel);
+        blockingSparqlStub = SparqlServiceGrpc.newBlockingStub(channel);
 
         updateResponseObserver = new StreamObserver<Sail.UpdateResponse>() {
             @Override
@@ -177,11 +180,21 @@ public class OstrichSailConnection extends NotifyingSailConnectionBase {
      * @return
      * @throws SailException
      */
-    public CloseableIteration<? extends BindingSet, QueryEvaluationException> directTupleQuery(String query) throws SailException {
+    public CloseableIteration<? extends BindingSet, QueryEvaluationException> directTupleQuery(String query, String baseUri) throws SailException {
         log.info("Committing transaction before querying ...");
         commitForQuery();
 
-        Sparql.SparqlRequest request = Sparql.SparqlRequest.newBuilder().setQuery(query).build();
+        Sparql.SparqlRequest request;
+        if (baseUri != null) {
+            request = Sparql.SparqlRequest.newBuilder()
+                    .setQuery(query)
+                    .setBaseUri(new ProtoURI(baseUri).getMessage())
+                    .build();
+        } else {
+            request = Sparql.SparqlRequest.newBuilder()
+                    .setQuery(query)
+                    .build();
+        }
 
         return new ExceptionConvertingIteration<BindingSet, QueryEvaluationException>(
                 new ConvertingIteration<Sparql.SparqlResponse, BindingSet, SailException>(
@@ -226,12 +239,80 @@ public class OstrichSailConnection extends NotifyingSailConnectionBase {
         };
     }
 
+    /**
+     * Send a SPARQL query to a backend supporting direct SPARQL evaluation.
+     *
+     * @param query
+     * @return
+     * @throws SailException
+     */
+    public CloseableIteration<? extends Statement, QueryEvaluationException> directGraphQuery(String query, String baseUri) throws SailException {
+        log.info("Committing transaction before querying ...");
+        commitForQuery();
+
+        Sparql.SparqlRequest request;
+        if (baseUri != null) {
+            request = Sparql.SparqlRequest.newBuilder()
+                    .setQuery(query)
+                    .setBaseUri(new ProtoURI(baseUri).getMessage())
+                    .build();
+        } else {
+            request = Sparql.SparqlRequest.newBuilder()
+                    .setQuery(query)
+                    .build();
+        }
+
+        return new ExceptionConvertingIteration<Statement, QueryEvaluationException>(
+                new ConvertingIteration<Model.Statement, Statement, SailException>(
+                        new ClosableResponseStream<>(sparqlServiceStub, SparqlServiceGrpc.METHOD_GRAPH_QUERY, request)) {
+                    @Override
+                    protected Statement convert(Model.Statement sourceObject) throws SailException {
+                        return new ProtoStatement(sourceObject);
+                    }
+                }) {
+            @Override
+            protected QueryEvaluationException convert(Exception e) {
+                return new QueryEvaluationException(e);
+            }
+        };
+    }
+
+    /**
+     * Send a SPARQL query to a backend supporting direct SPARQL evaluation.
+     *
+     * @param query
+     * @return
+     * @throws SailException
+     */
+    public boolean directBooleanQuery(String query, String baseUri) throws SailException {
+        log.info("Committing transaction before querying ...");
+        commitForQuery();
+
+        Sparql.SparqlRequest request;
+        if (baseUri != null) {
+            request = Sparql.SparqlRequest.newBuilder()
+                    .setQuery(query)
+                    .setBaseUri(new ProtoURI(baseUri).getMessage())
+                    .build();
+        } else {
+            request = Sparql.SparqlRequest.newBuilder()
+                    .setQuery(query)
+                    .build();
+        }
+
+        try {
+            return blockingSparqlStub.askQuery(request).getValue();
+        } catch (StatusRuntimeException ex) {
+            throw new SailException(ex.getMessage());
+        }
+    }
+
     @Override
     protected CloseableIteration<? extends Resource, SailException> getContextIDsInternal() throws SailException {
         log.info("Committing transaction before querying ...");
         commitForQuery();
 
-        return wrapResourceIterator(stub.getContexts(Empty.getDefaultInstance()));
+        return wrapResourceIterator(blockingSailStub.getContexts(Empty.getDefaultInstance()));
     }
 
     @Override
@@ -272,7 +353,7 @@ public class OstrichSailConnection extends NotifyingSailConnectionBase {
             }
         }
 
-        Int64Value v = stub.size(builder.build());
+        Int64Value v = blockingSailStub.size(builder.build());
         return v.getValue();
     }
 
@@ -361,7 +442,7 @@ public class OstrichSailConnection extends NotifyingSailConnectionBase {
         commitForQuery();
 
         Empty pattern = Empty.getDefaultInstance();
-        return wrapNamespaceIterator(stub.getNamespaces(pattern));
+        return wrapNamespaceIterator(blockingSailStub.getNamespaces(pattern));
     }
 
     @Override
@@ -371,7 +452,7 @@ public class OstrichSailConnection extends NotifyingSailConnectionBase {
 
         Model.Namespace pattern = Model.Namespace.newBuilder().setPrefix(prefix).build();
         try {
-            return stub.getNamespace(pattern).getUri();
+            return blockingSailStub.getNamespace(pattern).getUri();
         } catch (io.grpc.StatusRuntimeException ex) {
             if (ex.getStatus().getCode() == Status.Code.NOT_FOUND) {
                 return null;

http://git-wip-us.apache.org/repos/asf/marmotta/blob/4ab20b3d/platform/backends/marmotta-backend-ostrich/src/main/java/org/apache/marmotta/platform/backend/ostrich/OstrichSailRepository.java
----------------------------------------------------------------------
diff --git a/platform/backends/marmotta-backend-ostrich/src/main/java/org/apache/marmotta/platform/backend/ostrich/OstrichSailRepository.java b/platform/backends/marmotta-backend-ostrich/src/main/java/org/apache/marmotta/platform/backend/ostrich/OstrichSailRepository.java
index 0dbf1e8..684ae5b 100644
--- a/platform/backends/marmotta-backend-ostrich/src/main/java/org/apache/marmotta/platform/backend/ostrich/OstrichSailRepository.java
+++ b/platform/backends/marmotta-backend-ostrich/src/main/java/org/apache/marmotta/platform/backend/ostrich/OstrichSailRepository.java
@@ -19,16 +19,13 @@ package org.apache.marmotta.platform.backend.ostrich;
 
 import info.aduna.iteration.CloseableIteration;
 import org.apache.marmotta.ostrich.sail.OstrichSailConnection;
+import org.openrdf.model.Statement;
 import org.openrdf.query.*;
+import org.openrdf.query.impl.GraphQueryResultImpl;
 import org.openrdf.query.impl.TupleQueryResultImpl;
-import org.openrdf.query.parser.ParsedQuery;
-import org.openrdf.query.parser.ParsedTupleQuery;
-import org.openrdf.query.parser.QueryParserUtil;
+import org.openrdf.query.parser.*;
 import org.openrdf.repository.RepositoryException;
-import org.openrdf.repository.sail.SailQuery;
-import org.openrdf.repository.sail.SailRepository;
-import org.openrdf.repository.sail.SailRepositoryConnection;
-import org.openrdf.repository.sail.SailTupleQuery;
+import org.openrdf.repository.sail.*;
 import org.openrdf.sail.Sail;
 import org.openrdf.sail.SailConnection;
 import org.openrdf.sail.SailException;
@@ -37,6 +34,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 
 /**
  * A wrapper SailRepository for Ostrich allowing access to direct SPARQL support.
@@ -69,7 +67,7 @@ public class OstrichSailRepository extends SailRepository {
                                     // Let Sesame still parse the query for better error messages and for the binding names.
                                     ParsedTupleQuery parsedQuery = QueryParserUtil.parseTupleQuery(ql, queryString, baseURI);
                                     OstrichSailConnection sailCon = findConnection(getConnection().getSailConnection());
-                                    bindingsIter = sailCon.directTupleQuery(queryString);
+                                    bindingsIter = sailCon.directTupleQuery(queryString, baseURI);
                                     bindingsIter = enforceMaxQueryTime(bindingsIter);
 
                                     return new TupleQueryResultImpl(new ArrayList<String>(parsedQuery.getTupleExpr().getBindingNames()), bindingsIter);
@@ -86,11 +84,69 @@ public class OstrichSailRepository extends SailRepository {
                 }
 
                 @Override
+                public SailBooleanQuery prepareBooleanQuery(final QueryLanguage ql, final String queryString, final String baseURI) throws MalformedQueryException {
+                    if (ql == QueryLanguage.SPARQL) {
+                        return new SailBooleanQuery(null, this) {
+                            @Override
+                            public boolean evaluate() throws QueryEvaluationException {
+                                try {
+                                    log.info("Running native SPARQL query: {}", queryString);
+                                    CloseableIteration<? extends BindingSet, QueryEvaluationException> bindingsIter;
+
+                                    // Let Sesame still parse the query for better error messages and for the binding names.
+                                    ParsedBooleanQuery parsedQuery = QueryParserUtil.parseBooleanQuery(ql, queryString, baseURI);
+                                    OstrichSailConnection sailCon = findConnection(getConnection().getSailConnection());
+                                    return sailCon.directBooleanQuery(queryString, baseURI);
+                                } catch (SailException e) {
+                                    throw new QueryEvaluationException(e.getMessage(), e);
+                                } catch (MalformedQueryException e) {
+                                    throw new QueryEvaluationException(e.getMessage(), e);
+                                }
+                            }
+                        };
+                    } else {
+                        return super.prepareBooleanQuery(ql, queryString, baseURI);
+                    }
+                }
+
+                @Override
+                public SailGraphQuery prepareGraphQuery(final QueryLanguage ql, final String queryString, final String baseURI) throws MalformedQueryException {
+                    if (ql == QueryLanguage.SPARQL) {
+                        return new SailGraphQuery(null, this) {
+                            @Override
+                            public GraphQueryResult evaluate() throws QueryEvaluationException {
+                                try {
+                                    log.info("Running native SPARQL query: {}", queryString);
+                                    CloseableIteration<? extends Statement, ? extends QueryEvaluationException> bindingsIter;
+
+                                    // Let Sesame still parse the query for better error messages and for the binding names.
+                                    ParsedGraphQuery parsedQuery = QueryParserUtil.parseGraphQuery(ql, queryString, baseURI);
+                                    OstrichSailConnection sailCon = findConnection(getConnection().getSailConnection());
+                                    bindingsIter = sailCon.directGraphQuery(queryString, baseURI);
+
+                                    return new GraphQueryResultImpl(new HashMap<String, String>(), bindingsIter);
+                                } catch (SailException e) {
+                                    throw new QueryEvaluationException(e.getMessage(), e);
+                                } catch (MalformedQueryException e) {
+                                    throw new QueryEvaluationException(e.getMessage(), e);
+                                }
+                            }
+                        };
+                    } else {
+                        return super.prepareGraphQuery(ql, queryString, baseURI);
+                    }
+                }
+
+                @Override
                 public SailQuery prepareQuery(QueryLanguage ql, String queryString, String baseURI) throws MalformedQueryException {
                     ParsedQuery parsedQuery = QueryParserUtil.parseQuery(ql, queryString, baseURI);
 
                     if (parsedQuery instanceof ParsedTupleQuery) {
                         return prepareTupleQuery(ql, queryString, baseURI);
+                    } else if (parsedQuery instanceof ParsedBooleanQuery) {
+                        return prepareBooleanQuery(ql, queryString, baseURI);
+                    } else if (parsedQuery instanceof ParsedGraphQuery) {
+                        return prepareGraphQuery(ql, queryString, baseURI);
                     } else {
                         return super.prepareQuery(ql, queryString, baseURI);
                     }