You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@jena.apache.org by an...@apache.org on 2016/05/14 16:10:58 UTC
[34/41] jena git commit: Fix line-ending changes.
http://git-wip-us.apache.org/repos/asf/jena/blob/3d70d735/jena-arq/src/main/java/org/apache/jena/query/package.html
----------------------------------------------------------------------
diff --git a/jena-arq/src/main/java/org/apache/jena/query/package.html b/jena-arq/src/main/java/org/apache/jena/query/package.html
index 89c3bf3..0b7b632 100644
--- a/jena-arq/src/main/java/org/apache/jena/query/package.html
+++ b/jena-arq/src/main/java/org/apache/jena/query/package.html
@@ -1,11 +1,11 @@
-<html>
-<head>
-</head>
-<body>
-ARQ - A query engine for Jena, implementing SPARQL.
-<p>
-ARQ is an implementation of SPARQL, an RDF query language defined
-by W3C.
-</p>
-</body>
-</html>
+<html>
+<head>
+</head>
+<body>
+ARQ - A query engine for Jena, implementing SPARQL.
+<p>
+ARQ is an implementation of SPARQL, an RDF query language defined
+by W3C.
+</p>
+</body>
+</html>
http://git-wip-us.apache.org/repos/asf/jena/blob/3d70d735/jena-arq/src/main/java/org/apache/jena/riot/lang/LangCSV.java
----------------------------------------------------------------------
diff --git a/jena-arq/src/main/java/org/apache/jena/riot/lang/LangCSV.java b/jena-arq/src/main/java/org/apache/jena/riot/lang/LangCSV.java
index a5c9bfc..db1cf03 100644
--- a/jena-arq/src/main/java/org/apache/jena/riot/lang/LangCSV.java
+++ b/jena-arq/src/main/java/org/apache/jena/riot/lang/LangCSV.java
@@ -1,152 +1,152 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.jena.riot.lang;
-
-import java.io.InputStream;
-import java.io.Reader;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.jena.atlas.csv.CSVParser;
-import org.apache.jena.atlas.lib.IRILib ;
-import org.apache.jena.datatypes.xsd.XSDDatatype ;
-import org.apache.jena.graph.Node ;
-import org.apache.jena.graph.NodeFactory ;
-import org.apache.jena.riot.Lang;
-import org.apache.jena.riot.RDFLanguages;
-import org.apache.jena.riot.system.ErrorHandler;
-import org.apache.jena.riot.system.IRIResolver;
-import org.apache.jena.riot.system.ParserProfile;
-import org.apache.jena.riot.system.RiotLib;
-import org.apache.jena.riot.system.StreamRDF;
-
-public class LangCSV implements LangRIOT {
-
- public static final String CSV_PREFIX = "http://w3c/future-csv-vocab/";
- public static final String CSV_ROW = CSV_PREFIX + "row";
-
- private InputStream input = null;
- private Reader reader = null;
- private String base;
- private String filename;
- private StreamRDF sink;
- private ParserProfile profile; // Warning - we don't use all of this.
-
- @Override
- public Lang getLang() {
- return RDFLanguages.CSV;
-
- }
-
- @Override
- public ParserProfile getProfile() {
- return profile;
- }
-
- @Override
- public void setProfile(ParserProfile profile) {
- this.profile = profile;
- }
-
- public LangCSV(Reader reader, String base, String filename,
- ErrorHandler errorHandler, StreamRDF sink) {
- this.reader = reader;
- this.base = base;
- this.filename = filename;
- this.sink = sink;
- this.profile = RiotLib.profile(getLang(), base, errorHandler);
- }
-
- public LangCSV(InputStream in, String base, String filename,
- ErrorHandler errorHandler, StreamRDF sink) {
- this.input = in;
- this.base = base;
- this.filename = filename;
- this.sink = sink;
- this.profile = RiotLib.profile(getLang(), base, errorHandler);
- }
-
- @Override
- public void parse() {
- sink.start();
- CSVParser parser = (input != null) ? CSVParser.create(input)
- : CSVParser.create(reader);
- ArrayList<Node> predicates = new ArrayList<Node>();
- int rowNum = 0;
- for (List<String> row : parser) {
-
- if (rowNum == 0) {
- for (String column : row) {
- String uri = IRIResolver.resolveString(filename) + "#"
- + toSafeLocalname(column);
- Node predicate = this.profile.createURI(uri, rowNum, 0);
- predicates.add(predicate);
- }
- } else {
- //Node subject = this.profile.createBlankNode(null, -1, -1);
- Node subject = caculateSubject(rowNum, filename);
- Node predicateRow = this.profile.createURI(CSV_ROW, -1, -1);
- Node objectRow = this.profile
- .createTypedLiteral((rowNum + ""),
- XSDDatatype.XSDinteger, rowNum, 0);
- sink.triple(this.profile.createTriple(subject, predicateRow,
- objectRow, rowNum, 0));
- for (int col = 0; col < row.size() && col<predicates.size(); col++) {
- Node predicate = predicates.get(col);
- String columnValue = row.get(col).trim();
- if("".equals(columnValue)){
- continue;
- }
- Node o;
- try {
- // Try for a double.
- Double.parseDouble(columnValue);
- o = NodeFactory.createLiteral(columnValue,
- XSDDatatype.XSDdouble);
- } catch (Exception e) {
- o = NodeFactory.createLiteral(columnValue);
- }
- sink.triple(this.profile.createTriple(subject, predicate,
- o, rowNum, col));
- }
-
- }
- rowNum++;
- }
- sink.finish();
-
- }
-
- public static String toSafeLocalname(String raw) {
- String ret = raw.trim();
- return encodeURIComponent(ret);
-
- }
-
- public static String encodeURIComponent(String s) {
- return IRILib.encodeUriComponent(s);
- }
-
- public static Node caculateSubject(int rowNum, String filename){
- Node subject = NodeFactory.createBlankNode();
-// String uri = IRIResolver.resolveString(filename) + "#Row_" + rowNum;
-// Node subject = NodeFactory.createURI(uri);
- return subject;
- }
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.riot.lang;
+
+import java.io.InputStream;
+import java.io.Reader;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.jena.atlas.csv.CSVParser;
+import org.apache.jena.atlas.lib.IRILib ;
+import org.apache.jena.datatypes.xsd.XSDDatatype ;
+import org.apache.jena.graph.Node ;
+import org.apache.jena.graph.NodeFactory ;
+import org.apache.jena.riot.Lang;
+import org.apache.jena.riot.RDFLanguages;
+import org.apache.jena.riot.system.ErrorHandler;
+import org.apache.jena.riot.system.IRIResolver;
+import org.apache.jena.riot.system.ParserProfile;
+import org.apache.jena.riot.system.RiotLib;
+import org.apache.jena.riot.system.StreamRDF;
+
+public class LangCSV implements LangRIOT {
+
+ public static final String CSV_PREFIX = "http://w3c/future-csv-vocab/";
+ public static final String CSV_ROW = CSV_PREFIX + "row";
+
+ private InputStream input = null;
+ private Reader reader = null;
+ private String base;
+ private String filename;
+ private StreamRDF sink;
+ private ParserProfile profile; // Warning - we don't use all of this.
+
+ @Override
+ public Lang getLang() {
+ return RDFLanguages.CSV;
+
+ }
+
+ @Override
+ public ParserProfile getProfile() {
+ return profile;
+ }
+
+ @Override
+ public void setProfile(ParserProfile profile) {
+ this.profile = profile;
+ }
+
+ public LangCSV(Reader reader, String base, String filename,
+ ErrorHandler errorHandler, StreamRDF sink) {
+ this.reader = reader;
+ this.base = base;
+ this.filename = filename;
+ this.sink = sink;
+ this.profile = RiotLib.profile(getLang(), base, errorHandler);
+ }
+
+ public LangCSV(InputStream in, String base, String filename,
+ ErrorHandler errorHandler, StreamRDF sink) {
+ this.input = in;
+ this.base = base;
+ this.filename = filename;
+ this.sink = sink;
+ this.profile = RiotLib.profile(getLang(), base, errorHandler);
+ }
+
+ @Override
+ public void parse() {
+ sink.start();
+ CSVParser parser = (input != null) ? CSVParser.create(input)
+ : CSVParser.create(reader);
+ ArrayList<Node> predicates = new ArrayList<Node>();
+ int rowNum = 0;
+ for (List<String> row : parser) {
+
+ if (rowNum == 0) {
+ for (String column : row) {
+ String uri = IRIResolver.resolveString(filename) + "#"
+ + toSafeLocalname(column);
+ Node predicate = this.profile.createURI(uri, rowNum, 0);
+ predicates.add(predicate);
+ }
+ } else {
+ //Node subject = this.profile.createBlankNode(null, -1, -1);
+ Node subject = caculateSubject(rowNum, filename);
+ Node predicateRow = this.profile.createURI(CSV_ROW, -1, -1);
+ Node objectRow = this.profile
+ .createTypedLiteral((rowNum + ""),
+ XSDDatatype.XSDinteger, rowNum, 0);
+ sink.triple(this.profile.createTriple(subject, predicateRow,
+ objectRow, rowNum, 0));
+ for (int col = 0; col < row.size() && col<predicates.size(); col++) {
+ Node predicate = predicates.get(col);
+ String columnValue = row.get(col).trim();
+ if("".equals(columnValue)){
+ continue;
+ }
+ Node o;
+ try {
+ // Try for a double.
+ Double.parseDouble(columnValue);
+ o = NodeFactory.createLiteral(columnValue,
+ XSDDatatype.XSDdouble);
+ } catch (Exception e) {
+ o = NodeFactory.createLiteral(columnValue);
+ }
+ sink.triple(this.profile.createTriple(subject, predicate,
+ o, rowNum, col));
+ }
+
+ }
+ rowNum++;
+ }
+ sink.finish();
+
+ }
+
+ public static String toSafeLocalname(String raw) {
+ String ret = raw.trim();
+ return encodeURIComponent(ret);
+
+ }
+
+ public static String encodeURIComponent(String s) {
+ return IRILib.encodeUriComponent(s);
+ }
+
+ public static Node caculateSubject(int rowNum, String filename){
+ Node subject = NodeFactory.createBlankNode();
+// String uri = IRIResolver.resolveString(filename) + "#Row_" + rowNum;
+// Node subject = NodeFactory.createURI(uri);
+ return subject;
+ }
+}
http://git-wip-us.apache.org/repos/asf/jena/blob/3d70d735/jena-arq/src/main/java/org/apache/jena/riot/lang/PipedQuadsStream.java
----------------------------------------------------------------------
diff --git a/jena-arq/src/main/java/org/apache/jena/riot/lang/PipedQuadsStream.java b/jena-arq/src/main/java/org/apache/jena/riot/lang/PipedQuadsStream.java
index ff9ba63..5f9d6a6 100644
--- a/jena-arq/src/main/java/org/apache/jena/riot/lang/PipedQuadsStream.java
+++ b/jena-arq/src/main/java/org/apache/jena/riot/lang/PipedQuadsStream.java
@@ -1,53 +1,53 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.jena.riot.lang ;
-
-import org.apache.jena.graph.Triple ;
-import org.apache.jena.riot.system.StreamRDF ;
-import org.apache.jena.sparql.core.Quad ;
-
-/**
- * Implementation of a producer class that sends Quads; must be connected to a {@code PipedRDFIterator<Quad>}.
- */
-public class PipedQuadsStream extends PipedRDFStream<Quad> implements StreamRDF
-{
- /**
- * Creates a piped quads stream connected to the specified piped
- * RDF iterator. Quads written to this stream will then be
- * available as input from <code>sink</code>.
- *
- * @param sink The piped RDF iterator to connect to.
- */
- public PipedQuadsStream(PipedRDFIterator<Quad> sink)
- {
- super(sink) ;
- }
-
- @Override
- public void triple(Triple triple)
- {
- // Triples are discarded
- }
-
- @Override
- public void quad(Quad quad)
- {
- receive(quad) ;
- }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.riot.lang ;
+
+import org.apache.jena.graph.Triple ;
+import org.apache.jena.riot.system.StreamRDF ;
+import org.apache.jena.sparql.core.Quad ;
+
+/**
+ * Implementation of a producer class that sends Quads; must be connected to a {@code PipedRDFIterator<Quad>}.
+ */
+public class PipedQuadsStream extends PipedRDFStream<Quad> implements StreamRDF
+{
+ /**
+ * Creates a piped quads stream connected to the specified piped
+ * RDF iterator. Quads written to this stream will then be
+ * available as input from <code>sink</code>.
+ *
+ * @param sink The piped RDF iterator to connect to.
+ */
+ public PipedQuadsStream(PipedRDFIterator<Quad> sink)
+ {
+ super(sink) ;
+ }
+
+ @Override
+ public void triple(Triple triple)
+ {
+ // Triples are discarded
+ }
+
+ @Override
+ public void quad(Quad quad)
+ {
+ receive(quad) ;
+ }
+}
http://git-wip-us.apache.org/repos/asf/jena/blob/3d70d735/jena-arq/src/main/java/org/apache/jena/riot/lang/PipedRDFIterator.java
----------------------------------------------------------------------
diff --git a/jena-arq/src/main/java/org/apache/jena/riot/lang/PipedRDFIterator.java b/jena-arq/src/main/java/org/apache/jena/riot/lang/PipedRDFIterator.java
index a79ae6f..3259b9d 100644
--- a/jena-arq/src/main/java/org/apache/jena/riot/lang/PipedRDFIterator.java
+++ b/jena-arq/src/main/java/org/apache/jena/riot/lang/PipedRDFIterator.java
@@ -1,392 +1,392 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.jena.riot.lang;
-
-import java.util.Iterator;
-import java.util.NoSuchElementException;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.CancellationException;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.jena.atlas.lib.Closeable;
-import org.apache.jena.riot.RiotException;
-import org.apache.jena.riot.system.PrefixMap;
-import org.apache.jena.riot.system.PrefixMapFactory;
-
-/**
- * <p>
- * A {@code PipedRDFIterator} should be connected to a {@link PipedRDFStream}
- * implementation; the piped iterator then provides whatever RDF primitives are
- * written to the {@code PipedRDFStream}
- * </p>
- * <p>
- * Typically, data is read from a {@code PipedRDFIterator} by one thread (the
- * consumer) and data is written to the corresponding {@code PipedRDFStream} by
- * some other thread (the producer). Attempting to use both objects from a
- * single thread is not recommended, as it may deadlock the thread. The
- * {@code PipedRDFIterator} contains a buffer, decoupling read operations from
- * write operations, within limits.
- * </p>
- * <p>
- * Inspired by Java's {@link java.io.PipedInputStream} and
- * {@link java.io.PipedOutputStream}
- * </p>
- *
- * @param <T>
- * The type of the RDF primitive, should be one of {@code Triple},
- * {@code Quad}, or {@code Tuple<Node>}
- *
- * @see PipedTriplesStream
- * @see PipedQuadsStream
- * @see PipedTuplesStream
- */
-public class PipedRDFIterator<T> implements Iterator<T>, Closeable {
- /**
- * Constant for default buffer size
- */
- public static final int DEFAULT_BUFFER_SIZE = 10000;
-
- /**
- * Constant for default poll timeout in milliseconds, used to stop the
- * consumer deadlocking in certain circumstances
- */
- public static final int DEFAULT_POLL_TIMEOUT = 1000; // one second
- /**
- * Constant for max number of failed poll attempts before the producer will
- * be declared as dead
- */
- public static final int DEFAULT_MAX_POLLS = 10;
-
- private final BlockingQueue<T> queue;
-
- @SuppressWarnings("unchecked")
- private final T endMarker = (T) new Object();
-
- private volatile boolean closedByConsumer = false;
- private volatile boolean closedByProducer = false;
- private volatile boolean finished = false;
- private volatile boolean threadReused = false;
- private volatile Thread consumerThread;
- private volatile Thread producerThread;
-
- private boolean connected = false;
- private int pollTimeout = DEFAULT_POLL_TIMEOUT;
- private int maxPolls = DEFAULT_MAX_POLLS;
-
- private T slot;
-
- private final Object lock = new Object(); // protects baseIri and prefixes
- private String baseIri;
- private final PrefixMap prefixes = PrefixMapFactory.createForInput();
-
- /**
- * Creates a new piped RDF iterator with the default buffer size of
- * {@code DEFAULT_BUFFER_SIZE}.
- * <p>
- * Buffer size must be chosen carefully in order to avoid performance
- * problems, if you set the buffer size too low you will experience a lot of
- * blocked calls so it will take longer to consume the data from the
- * iterator. For best performance the buffer size should be at least 10% of
- * the expected input size though you may need to tune this depending on how
- * fast your consumer thread is.
- * </p>
- */
- public PipedRDFIterator() {
- this(DEFAULT_BUFFER_SIZE);
- }
-
- /**
- * Creates a new piped RDF iterator
- * <p>
- * Buffer size must be chosen carefully in order to avoid performance
- * problems, if you set the buffer size too low you will experience a lot of
- * blocked calls so it will take longer to consume the data from the
- * iterator. For best performance the buffer size should be roughly 10% of
- * the expected input size though you may need to tune this depending on how
- * fast your consumer thread is.
- * </p>
- *
- * @param bufferSize
- * Buffer size
- */
- public PipedRDFIterator(int bufferSize) {
- this(bufferSize, false, DEFAULT_POLL_TIMEOUT, DEFAULT_MAX_POLLS);
- }
-
- /**
- * Creates a new piped RDF iterator
- * <p>
- * Buffer size must be chosen carefully in order to avoid performance
- * problems, if you set the buffer size too low you will experience a lot of
- * blocked calls so it will take longer to consume the data from the
- * iterator. For best performance the buffer size should be roughly 10% of
- * the expected input size though you may need to tune this depending on how
- * fast your consumer thread is.
- * </p>
- * <p>
- * The fair parameter controls whether the locking policy used for the
- * buffer is fair. When enabled this reduces throughput but also reduces the
- * chance of thread starvation. This likely need only be set to {@code true}
- * if there will be multiple consumers.
- * </p>
- *
- * @param bufferSize
- * Buffer size
- * @param fair
- * Whether the buffer should use a fair locking policy
- */
- public PipedRDFIterator(int bufferSize, boolean fair) {
- this(bufferSize, fair, DEFAULT_POLL_TIMEOUT, DEFAULT_MAX_POLLS);
- }
-
- /**
- * Creates a new piped RDF iterator
- * <p>
- * Buffer size must be chosen carefully in order to avoid performance
- * problems, if you set the buffer size too low you will experience a lot of
- * blocked calls so it will take longer to consume the data from the
- * iterator. For best performance the buffer size should be roughly 10% of
- * the expected input size though you may need to tune this depending on how
- * fast your consumer thread is.
- * </p>
- * <p>
- * The {@code fair} parameter controls whether the locking policy used for
- * the buffer is fair. When enabled this reduces throughput but also reduces
- * the chance of thread starvation. This likely need only be set to
- * {@code true} if there will be multiple consumers.
- * </p>
- * <p>
- * The {@code pollTimeout} parameter controls how long each poll attempt
- * waits for data to be produced. This prevents the consumer thread from
- * blocking indefinitely and allows it to detect various potential deadlock
- * conditions e.g. dead producer thread, another consumer closed the
- * iterator etc. and errors out accordingly. It is unlikely that you will
- * ever need to adjust this from the default value provided by
- * {@link #DEFAULT_POLL_TIMEOUT}.
- * </p>
- * <p>
- * The {@code maxPolls} parameter controls how many poll attempts will be
- * made by a single consumer thread within the context of a single call to
- * {@link #hasNext()} before the iterator declares the producer to be dead
- * and errors out accordingly. You may need to adjust this if you have a
- * slow producer thread or many consumer threads.
- * </p>
- *
- * @param bufferSize
- * Buffer size
- * @param fair
- * Whether the buffer should use a fair locking policy
- * @param pollTimeout
- * Poll timeout in milliseconds
- * @param maxPolls
- * Max poll attempts
- */
- public PipedRDFIterator(int bufferSize, boolean fair, int pollTimeout, int maxPolls) {
- if (pollTimeout <= 0)
- throw new IllegalArgumentException("Poll Timeout must be > 0");
- if (maxPolls <= 0)
- throw new IllegalArgumentException("Max Poll attempts must be > 0");
- this.queue = new ArrayBlockingQueue<>(bufferSize, fair);
- this.pollTimeout = pollTimeout;
- this.maxPolls = maxPolls;
- }
-
- @Override
- public boolean hasNext() {
- if (!connected)
- throw new IllegalStateException("Pipe not connected");
-
- if (closedByConsumer)
- throw new RiotException("Pipe closed");
-
- if (finished)
- return false;
-
- consumerThread = Thread.currentThread();
-
- // Depending on how code and/or the JVM schedules the threads involved
- // there is a scenario that exists where a producer can finish/die
- // before theconsumer is started and the consumer is scheduled onto the
- // same thread thus resulting in a deadlock on the consumer because it
- // will never be able to detect that the producer died
- // In this scenario we need to set a special flag to indicate the
- // possibility
- if (producerThread != null && producerThread == consumerThread)
- threadReused = true;
-
- if (slot != null)
- return true;
-
- int attempts = 0;
- while (true) {
- attempts++;
- try {
- slot = queue.poll(this.pollTimeout, TimeUnit.MILLISECONDS);
- } catch (InterruptedException e) {
- throw new CancellationException();
- }
-
- if (null != slot)
- break;
-
- // If the producer thread died and did not call finish() then
- // declare this pipe to be "broken"
- // Since check is after the break, we will drain as much as possible
- // out of the queue before throwing this exception
- if (threadReused || (producerThread != null && !producerThread.isAlive() && !closedByProducer)) {
- closedByConsumer = true;
- throw new RiotException("Producer dead");
- }
-
- // Need to check this inside the loop as otherwise outside code that
- // attempts to break the deadlock by causing close() on the iterator
- // cannot do so
- if (closedByConsumer)
- throw new RiotException("Pipe closed");
-
- // Need to check whether polling attempts have been exceeded
- // If so declare the producer dead and exit
- if (attempts >= this.maxPolls) {
- closedByConsumer = true;
- if (producerThread != null) {
- throw new RiotException(
- "Producer failed to produce any data within the specified number of polling attempts, declaring producer dead");
- } else {
- throw new RiotException("Producer failed to ever call start(), declaring producer dead");
- }
- }
- }
-
- // When the end marker is seen set slot to null
- if (slot == endMarker) {
- finished = true;
- slot = null;
- return false;
- }
- return true;
- }
-
- @Override
- public T next() {
- if (!hasNext())
- throw new NoSuchElementException();
- T item = slot;
- slot = null;
- return item;
- }
-
- @Override
- public void remove() {
- throw new UnsupportedOperationException();
- }
-
- private void checkStateForReceive() {
- if (closedByProducer || closedByConsumer) {
- throw new RiotException("Pipe closed");
- } else if (consumerThread != null && !consumerThread.isAlive()) {
- throw new RiotException("Consumer dead");
- }
- }
-
- protected void connect() {
- this.connected = true;
- }
-
- protected void receive(T t) {
- checkStateForReceive();
- producerThread = Thread.currentThread();
-
- try {
- queue.put(t);
- } catch (InterruptedException e) {
- throw new CancellationException();
- }
- }
-
- protected void base(String base) {
- synchronized (lock) {
- this.baseIri = base;
- }
- }
-
- /**
- * Gets the most recently seen Base IRI
- *
- * @return Base IRI
- */
- public String getBaseIri() {
- synchronized (lock) {
- return baseIri;
- }
- }
-
- protected void prefix(String prefix, String iri) {
- synchronized (lock) {
- prefixes.add(prefix, iri);
- }
- }
-
- /**
- * Gets the prefix map which contains the prefixes seen so far in the stream
- *
- * @return Prefix Map
- */
- public PrefixMap getPrefixes() {
- synchronized (lock) {
- // Need to return a copy since PrefixMap is not concurrent
- return PrefixMapFactory.create(this.prefixes);
- }
- }
-
- /**
- * Should be called by the producer when it begins writing to the iterator.
- * If the producer fails to call this for whatever reason and never produces
- * any output or calls {@code finish()} consumers may be blocked for a short
- * period before they detect this state and error out.
- */
- protected void start() {
- // Track the producer thread in case it never delivers us anything and
- // dies before calling finish
- producerThread = Thread.currentThread();
- }
-
- /**
- * Should be called by the producer when it has finished writing to the
- * iterator. If the producer fails to call this for whatever reason
- * consumers may be blocked for a short period before they detect this state
- * and error out.
- */
- protected void finish() {
- if ( closedByProducer )
- return ;
- receive(endMarker);
- closedByProducer = true;
- }
-
- /**
- * May be called by the consumer when it is finished reading from the
- * iterator, if the producer thread has not finished it will receive an
- * error the next time it tries to write to the iterator
- */
- @Override
- public void close() {
- closedByConsumer = true;
- }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.riot.lang;
+
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.jena.atlas.lib.Closeable;
+import org.apache.jena.riot.RiotException;
+import org.apache.jena.riot.system.PrefixMap;
+import org.apache.jena.riot.system.PrefixMapFactory;
+
+/**
+ * <p>
+ * A {@code PipedRDFIterator} should be connected to a {@link PipedRDFStream}
+ * implementation; the piped iterator then provides whatever RDF primitives are
+ * written to the {@code PipedRDFStream}
+ * </p>
+ * <p>
+ * Typically, data is read from a {@code PipedRDFIterator} by one thread (the
+ * consumer) and data is written to the corresponding {@code PipedRDFStream} by
+ * some other thread (the producer). Attempting to use both objects from a
+ * single thread is not recommended, as it may deadlock the thread. The
+ * {@code PipedRDFIterator} contains a buffer, decoupling read operations from
+ * write operations, within limits.
+ * </p>
+ * <p>
+ * Inspired by Java's {@link java.io.PipedInputStream} and
+ * {@link java.io.PipedOutputStream}
+ * </p>
+ *
+ * @param <T>
+ * The type of the RDF primitive, should be one of {@code Triple},
+ * {@code Quad}, or {@code Tuple<Node>}
+ *
+ * @see PipedTriplesStream
+ * @see PipedQuadsStream
+ * @see PipedTuplesStream
+ */
+public class PipedRDFIterator<T> implements Iterator<T>, Closeable {
+ /**
+ * Constant for default buffer size
+ */
+ public static final int DEFAULT_BUFFER_SIZE = 10000;
+
+ /**
+ * Constant for default poll timeout in milliseconds, used to stop the
+ * consumer deadlocking in certain circumstances
+ */
+ public static final int DEFAULT_POLL_TIMEOUT = 1000; // one second
+ /**
+ * Constant for max number of failed poll attempts before the producer will
+ * be declared as dead
+ */
+ public static final int DEFAULT_MAX_POLLS = 10;
+
+ private final BlockingQueue<T> queue;
+
+ @SuppressWarnings("unchecked")
+ private final T endMarker = (T) new Object();
+
+ private volatile boolean closedByConsumer = false;
+ private volatile boolean closedByProducer = false;
+ private volatile boolean finished = false;
+ private volatile boolean threadReused = false;
+ private volatile Thread consumerThread;
+ private volatile Thread producerThread;
+
+ private boolean connected = false;
+ private int pollTimeout = DEFAULT_POLL_TIMEOUT;
+ private int maxPolls = DEFAULT_MAX_POLLS;
+
+ private T slot;
+
+ private final Object lock = new Object(); // protects baseIri and prefixes
+ private String baseIri;
+ private final PrefixMap prefixes = PrefixMapFactory.createForInput();
+
+ /**
+ * Creates a new piped RDF iterator with the default buffer size of
+ * {@code DEFAULT_BUFFER_SIZE}.
+ * <p>
+ * Buffer size must be chosen carefully in order to avoid performance
+ * problems, if you set the buffer size too low you will experience a lot of
+ * blocked calls so it will take longer to consume the data from the
+ * iterator. For best performance the buffer size should be at least 10% of
+ * the expected input size though you may need to tune this depending on how
+ * fast your consumer thread is.
+ * </p>
+ */
+ public PipedRDFIterator() {
+ this(DEFAULT_BUFFER_SIZE);
+ }
+
+ /**
+ * Creates a new piped RDF iterator
+ * <p>
+ * Buffer size must be chosen carefully in order to avoid performance
+ * problems, if you set the buffer size too low you will experience a lot of
+ * blocked calls so it will take longer to consume the data from the
+ * iterator. For best performance the buffer size should be roughly 10% of
+ * the expected input size though you may need to tune this depending on how
+ * fast your consumer thread is.
+ * </p>
+ *
+ * @param bufferSize
+ * Buffer size
+ */
+ public PipedRDFIterator(int bufferSize) {
+ this(bufferSize, false, DEFAULT_POLL_TIMEOUT, DEFAULT_MAX_POLLS);
+ }
+
+ /**
+ * Creates a new piped RDF iterator
+ * <p>
+ * Buffer size must be chosen carefully in order to avoid performance
+ * problems, if you set the buffer size too low you will experience a lot of
+ * blocked calls so it will take longer to consume the data from the
+ * iterator. For best performance the buffer size should be roughly 10% of
+ * the expected input size though you may need to tune this depending on how
+ * fast your consumer thread is.
+ * </p>
+ * <p>
+ * The fair parameter controls whether the locking policy used for the
+ * buffer is fair. When enabled this reduces throughput but also reduces the
+ * chance of thread starvation. This likely need only be set to {@code true}
+ * if there will be multiple consumers.
+ * </p>
+ *
+ * @param bufferSize
+ * Buffer size
+ * @param fair
+ * Whether the buffer should use a fair locking policy
+ */
+ public PipedRDFIterator(int bufferSize, boolean fair) {
+ this(bufferSize, fair, DEFAULT_POLL_TIMEOUT, DEFAULT_MAX_POLLS);
+ }
+
+ /**
+ * Creates a new piped RDF iterator
+ * <p>
+ * Buffer size must be chosen carefully in order to avoid performance
+ * problems, if you set the buffer size too low you will experience a lot of
+ * blocked calls so it will take longer to consume the data from the
+ * iterator. For best performance the buffer size should be roughly 10% of
+ * the expected input size though you may need to tune this depending on how
+ * fast your consumer thread is.
+ * </p>
+ * <p>
+ * The {@code fair} parameter controls whether the locking policy used for
+ * the buffer is fair. When enabled this reduces throughput but also reduces
+ * the chance of thread starvation. This likely need only be set to
+ * {@code true} if there will be multiple consumers.
+ * </p>
+ * <p>
+ * The {@code pollTimeout} parameter controls how long each poll attempt
+ * waits for data to be produced. This prevents the consumer thread from
+ * blocking indefinitely and allows it to detect various potential deadlock
+ * conditions e.g. dead producer thread, another consumer closed the
+ * iterator etc. and errors out accordingly. It is unlikely that you will
+ * ever need to adjust this from the default value provided by
+ * {@link #DEFAULT_POLL_TIMEOUT}.
+ * </p>
+ * <p>
+ * The {@code maxPolls} parameter controls how many poll attempts will be
+ * made by a single consumer thread within the context of a single call to
+ * {@link #hasNext()} before the iterator declares the producer to be dead
+ * and errors out accordingly. You may need to adjust this if you have a
+ * slow producer thread or many consumer threads.
+ * </p>
+ *
+ * @param bufferSize
+ * Buffer size
+ * @param fair
+ * Whether the buffer should use a fair locking policy
+ * @param pollTimeout
+ * Poll timeout in milliseconds
+ * @param maxPolls
+ * Max poll attempts
+ */
+ public PipedRDFIterator(int bufferSize, boolean fair, int pollTimeout, int maxPolls) {
+ if (pollTimeout <= 0)
+ throw new IllegalArgumentException("Poll Timeout must be > 0");
+ if (maxPolls <= 0)
+ throw new IllegalArgumentException("Max Poll attempts must be > 0");
+ this.queue = new ArrayBlockingQueue<>(bufferSize, fair);
+ this.pollTimeout = pollTimeout;
+ this.maxPolls = maxPolls;
+ }
+
+ @Override
+ public boolean hasNext() {
+ if (!connected)
+ throw new IllegalStateException("Pipe not connected");
+
+ if (closedByConsumer)
+ throw new RiotException("Pipe closed");
+
+ if (finished)
+ return false;
+
+ consumerThread = Thread.currentThread();
+
+ // Depending on how code and/or the JVM schedules the threads involved
+ // there is a scenario that exists where a producer can finish/die
+ // before theconsumer is started and the consumer is scheduled onto the
+ // same thread thus resulting in a deadlock on the consumer because it
+ // will never be able to detect that the producer died
+ // In this scenario we need to set a special flag to indicate the
+ // possibility
+ if (producerThread != null && producerThread == consumerThread)
+ threadReused = true;
+
+ if (slot != null)
+ return true;
+
+ int attempts = 0;
+ while (true) {
+ attempts++;
+ try {
+ slot = queue.poll(this.pollTimeout, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ throw new CancellationException();
+ }
+
+ if (null != slot)
+ break;
+
+ // If the producer thread died and did not call finish() then
+ // declare this pipe to be "broken"
+ // Since check is after the break, we will drain as much as possible
+ // out of the queue before throwing this exception
+ if (threadReused || (producerThread != null && !producerThread.isAlive() && !closedByProducer)) {
+ closedByConsumer = true;
+ throw new RiotException("Producer dead");
+ }
+
+ // Need to check this inside the loop as otherwise outside code that
+ // attempts to break the deadlock by causing close() on the iterator
+ // cannot do so
+ if (closedByConsumer)
+ throw new RiotException("Pipe closed");
+
+ // Need to check whether polling attempts have been exceeded
+ // If so declare the producer dead and exit
+ if (attempts >= this.maxPolls) {
+ closedByConsumer = true;
+ if (producerThread != null) {
+ throw new RiotException(
+ "Producer failed to produce any data within the specified number of polling attempts, declaring producer dead");
+ } else {
+ throw new RiotException("Producer failed to ever call start(), declaring producer dead");
+ }
+ }
+ }
+
+ // When the end marker is seen set slot to null
+ if (slot == endMarker) {
+ finished = true;
+ slot = null;
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public T next() {
+ if (!hasNext())
+ throw new NoSuchElementException();
+ T item = slot;
+ slot = null;
+ return item;
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+
+ private void checkStateForReceive() {
+ if (closedByProducer || closedByConsumer) {
+ throw new RiotException("Pipe closed");
+ } else if (consumerThread != null && !consumerThread.isAlive()) {
+ throw new RiotException("Consumer dead");
+ }
+ }
+
+ protected void connect() {
+ this.connected = true;
+ }
+
+ protected void receive(T t) {
+ checkStateForReceive();
+ producerThread = Thread.currentThread();
+
+ try {
+ queue.put(t);
+ } catch (InterruptedException e) {
+ throw new CancellationException();
+ }
+ }
+
+ protected void base(String base) {
+ synchronized (lock) {
+ this.baseIri = base;
+ }
+ }
+
+ /**
+ * Gets the most recently seen Base IRI
+ *
+ * @return Base IRI
+ */
+ public String getBaseIri() {
+ synchronized (lock) {
+ return baseIri;
+ }
+ }
+
+ protected void prefix(String prefix, String iri) {
+ synchronized (lock) {
+ prefixes.add(prefix, iri);
+ }
+ }
+
+ /**
+ * Gets the prefix map which contains the prefixes seen so far in the stream
+ *
+ * @return Prefix Map
+ */
+ public PrefixMap getPrefixes() {
+ synchronized (lock) {
+ // Need to return a copy since PrefixMap is not concurrent
+ return PrefixMapFactory.create(this.prefixes);
+ }
+ }
+
+ /**
+ * Should be called by the producer when it begins writing to the iterator.
+ * If the producer fails to call this for whatever reason and never produces
+ * any output or calls {@code finish()} consumers may be blocked for a short
+ * period before they detect this state and error out.
+ */
+ protected void start() {
+ // Track the producer thread in case it never delivers us anything and
+ // dies before calling finish
+ producerThread = Thread.currentThread();
+ }
+
+ /**
+ * Should be called by the producer when it has finished writing to the
+ * iterator. If the producer fails to call this for whatever reason
+ * consumers may be blocked for a short period before they detect this state
+ * and error out.
+ */
+ protected void finish() {
+ if ( closedByProducer )
+ return ;
+ receive(endMarker);
+ closedByProducer = true;
+ }
+
+ /**
+ * May be called by the consumer when it is finished reading from the
+ * iterator, if the producer thread has not finished it will receive an
+ * error the next time it tries to write to the iterator
+ */
+ @Override
+ public void close() {
+ closedByConsumer = true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/jena/blob/3d70d735/jena-arq/src/main/java/org/apache/jena/riot/lang/PipedRDFStream.java
----------------------------------------------------------------------
diff --git a/jena-arq/src/main/java/org/apache/jena/riot/lang/PipedRDFStream.java b/jena-arq/src/main/java/org/apache/jena/riot/lang/PipedRDFStream.java
index 40877e4..6406204 100644
--- a/jena-arq/src/main/java/org/apache/jena/riot/lang/PipedRDFStream.java
+++ b/jena-arq/src/main/java/org/apache/jena/riot/lang/PipedRDFStream.java
@@ -1,70 +1,70 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.jena.riot.lang ;
-
-import org.apache.jena.riot.system.StreamRDF ;
-
-/**
- * Abstract implementation of a producer class that implements {@code StreamRDF};
- * use one of the concrete implementations that match the RDF primitive you are using.
- * @param <T> Type corresponding to a supported RDF primitive
- *
- * @see PipedTriplesStream
- * @see PipedQuadsStream
- * @see PipedTuplesStream
- */
-public abstract class PipedRDFStream<T> implements StreamRDF
-{
- private final PipedRDFIterator<T> sink ;
-
- protected PipedRDFStream(PipedRDFIterator<T> sink)
- {
- this.sink = sink ;
- this.sink.connect();
- }
-
- protected void receive(T t)
- {
- sink.receive(t) ;
- }
-
- @Override
- public void base(String base)
- {
- sink.base(base) ;
- }
-
- @Override
- public void prefix(String prefix, String iri)
- {
- sink.prefix(prefix, iri) ;
- }
-
- @Override
- public void start()
- {
- sink.start() ;
- }
-
- @Override
- public void finish()
- {
- sink.finish() ;
- }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.riot.lang ;
+
+import org.apache.jena.riot.system.StreamRDF ;
+
+/**
+ * Abstract implementation of a producer class that implements {@code StreamRDF};
+ * use one of the concrete implementations that match the RDF primitive you are using.
+ * @param <T> Type corresponding to a supported RDF primitive
+ *
+ * @see PipedTriplesStream
+ * @see PipedQuadsStream
+ * @see PipedTuplesStream
+ */
+public abstract class PipedRDFStream<T> implements StreamRDF
+{
+ private final PipedRDFIterator<T> sink ;
+
+ protected PipedRDFStream(PipedRDFIterator<T> sink)
+ {
+ this.sink = sink ;
+ this.sink.connect();
+ }
+
+ protected void receive(T t)
+ {
+ sink.receive(t) ;
+ }
+
+ @Override
+ public void base(String base)
+ {
+ sink.base(base) ;
+ }
+
+ @Override
+ public void prefix(String prefix, String iri)
+ {
+ sink.prefix(prefix, iri) ;
+ }
+
+ @Override
+ public void start()
+ {
+ sink.start() ;
+ }
+
+ @Override
+ public void finish()
+ {
+ sink.finish() ;
+ }
+}
http://git-wip-us.apache.org/repos/asf/jena/blob/3d70d735/jena-arq/src/main/java/org/apache/jena/riot/lang/PipedTriplesStream.java
----------------------------------------------------------------------
diff --git a/jena-arq/src/main/java/org/apache/jena/riot/lang/PipedTriplesStream.java b/jena-arq/src/main/java/org/apache/jena/riot/lang/PipedTriplesStream.java
index 270d59e..c5c2dfe 100644
--- a/jena-arq/src/main/java/org/apache/jena/riot/lang/PipedTriplesStream.java
+++ b/jena-arq/src/main/java/org/apache/jena/riot/lang/PipedTriplesStream.java
@@ -1,53 +1,53 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.jena.riot.lang ;
-
-import org.apache.jena.graph.Triple ;
-import org.apache.jena.riot.system.StreamRDF ;
-import org.apache.jena.sparql.core.Quad ;
-
-/**
- * Implementation of a producer class that sends Triples; must be connected to a {@code PipedRDFIterator<Triple>}.
- */
-public class PipedTriplesStream extends PipedRDFStream<Triple> implements StreamRDF
-{
- /**
- * Creates a piped triples stream connected to the specified piped
- * RDF iterator. Triples written to this stream will then be
- * available as input from <code>sink</code>.
- *
- * @param sink The piped RDF iterator to connect to.
- */
- public PipedTriplesStream(PipedRDFIterator<Triple> sink)
- {
- super(sink) ;
- }
-
- @Override
- public void triple(Triple triple)
- {
- receive(triple) ;
- }
-
- @Override
- public void quad(Quad quad)
- {
- // Quads are discarded
- }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.riot.lang ;
+
+import org.apache.jena.graph.Triple ;
+import org.apache.jena.riot.system.StreamRDF ;
+import org.apache.jena.sparql.core.Quad ;
+
+/**
+ * Implementation of a producer class that sends Triples; must be connected to a {@code PipedRDFIterator<Triple>}.
+ */
+public class PipedTriplesStream extends PipedRDFStream<Triple> implements StreamRDF
+{
+ /**
+ * Creates a piped triples stream connected to the specified piped
+ * RDF iterator. Triples written to this stream will then be
+ * available as input from <code>sink</code>.
+ *
+ * @param sink The piped RDF iterator to connect to.
+ */
+ public PipedTriplesStream(PipedRDFIterator<Triple> sink)
+ {
+ super(sink) ;
+ }
+
+ @Override
+ public void triple(Triple triple)
+ {
+ receive(triple) ;
+ }
+
+ @Override
+ public void quad(Quad quad)
+ {
+ // Quads are discarded
+ }
+}
http://git-wip-us.apache.org/repos/asf/jena/blob/3d70d735/jena-arq/src/main/java/org/apache/jena/riot/lang/PipedTuplesStream.java
----------------------------------------------------------------------
diff --git a/jena-arq/src/main/java/org/apache/jena/riot/lang/PipedTuplesStream.java b/jena-arq/src/main/java/org/apache/jena/riot/lang/PipedTuplesStream.java
index f1a63d3..4bdb728 100644
--- a/jena-arq/src/main/java/org/apache/jena/riot/lang/PipedTuplesStream.java
+++ b/jena-arq/src/main/java/org/apache/jena/riot/lang/PipedTuplesStream.java
@@ -1,55 +1,55 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.jena.riot.lang ;
-
-import org.apache.jena.atlas.lib.tuple.Tuple ;
-import org.apache.jena.graph.Node ;
-import org.apache.jena.graph.Triple ;
-import org.apache.jena.riot.system.StreamRDF ;
-import org.apache.jena.sparql.core.Quad ;
-
-/**
- * Implementation of a producer class that sends @{code Tuple<Node>}; must be connected to a {@code PipedRDFIterator<Tuple<Node>}.
- */
-public class PipedTuplesStream extends PipedRDFStream<Tuple<Node>> implements StreamRDF
-{
- /**
- * Creates a piped tuples stream connected to the specified piped
- * RDF iterator. Tuples written to this stream will then be
- * available as input from <code>sink</code>.
- *
- * @param sink The piped RDF iterator to connect to.
- */
- public PipedTuplesStream(PipedRDFIterator<Tuple<Node>> sink)
- {
- super(sink) ;
- }
-
- @Override
- public void triple(Triple triple)
- {
- // Triples are discarded
- }
-
- @Override
- public void quad(Quad quad)
- {
- // Quads are discarded
- }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.riot.lang ;
+
+import org.apache.jena.atlas.lib.tuple.Tuple ;
+import org.apache.jena.graph.Node ;
+import org.apache.jena.graph.Triple ;
+import org.apache.jena.riot.system.StreamRDF ;
+import org.apache.jena.sparql.core.Quad ;
+
+/**
+ * Implementation of a producer class that sends @{code Tuple<Node>}; must be connected to a {@code PipedRDFIterator<Tuple<Node>}.
+ */
+public class PipedTuplesStream extends PipedRDFStream<Tuple<Node>> implements StreamRDF
+{
+ /**
+ * Creates a piped tuples stream connected to the specified piped
+ * RDF iterator. Tuples written to this stream will then be
+ * available as input from <code>sink</code>.
+ *
+ * @param sink The piped RDF iterator to connect to.
+ */
+ public PipedTuplesStream(PipedRDFIterator<Tuple<Node>> sink)
+ {
+ super(sink) ;
+ }
+
+ @Override
+ public void triple(Triple triple)
+ {
+ // Triples are discarded
+ }
+
+ @Override
+ public void quad(Quad quad)
+ {
+ // Quads are discarded
+ }
+}
http://git-wip-us.apache.org/repos/asf/jena/blob/3d70d735/jena-arq/src/main/java/org/apache/jena/riot/out/SinkQuadBracedOutput.java
----------------------------------------------------------------------
diff --git a/jena-arq/src/main/java/org/apache/jena/riot/out/SinkQuadBracedOutput.java b/jena-arq/src/main/java/org/apache/jena/riot/out/SinkQuadBracedOutput.java
index d27e46c..0841c50 100644
--- a/jena-arq/src/main/java/org/apache/jena/riot/out/SinkQuadBracedOutput.java
+++ b/jena-arq/src/main/java/org/apache/jena/riot/out/SinkQuadBracedOutput.java
@@ -1,137 +1,137 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.jena.riot.out ;
-
-import java.io.OutputStream ;
-import java.util.Objects;
-
-import org.apache.jena.atlas.io.IndentedWriter ;
-import org.apache.jena.atlas.lib.Closeable ;
-import org.apache.jena.atlas.lib.Sink ;
-import org.apache.jena.graph.Node ;
-import org.apache.jena.graph.Triple ;
-import org.apache.jena.sparql.core.Quad ;
-import org.apache.jena.sparql.serializer.SerializationContext ;
-import org.apache.jena.sparql.util.FmtUtils ;
-
-/**
- * A class that print quads, SPARQL style (maybe good for Trig too?)
- */
-public class SinkQuadBracedOutput implements Sink<Quad>, Closeable
-{
- protected static final int BLOCK_INDENT = 2 ;
-
- protected final IndentedWriter out ;
- protected final SerializationContext sCxt ;
- protected boolean opened = false ;
-
- protected Node currentGraph ;
-
- public SinkQuadBracedOutput(OutputStream out) {
- this(out, null) ;
- }
-
- public SinkQuadBracedOutput(OutputStream out, SerializationContext sCxt) {
- this(new IndentedWriter(out), sCxt) ;
- }
-
- public SinkQuadBracedOutput(IndentedWriter out, SerializationContext sCxt) {
- if ( out == null ) { throw new IllegalArgumentException("out may not be null") ; }
-
- if ( sCxt == null ) {
- sCxt = new SerializationContext() ;
- }
-
- this.out = out ;
- this.sCxt = sCxt ;
- }
-
- public void open() {
- out.println("{") ;
- out.incIndent(BLOCK_INDENT) ;
- opened = true ;
- }
-
- private void checkOpen() {
- if ( !opened ) { throw new IllegalStateException("SinkQuadBracedOutput is not opened. Call open() first.") ; }
- }
-
- @Override
- public void send(Quad quad) {
- send(quad.getGraph(), quad.asTriple()) ;
- }
-
- public void send(Node graphName, Triple triple) {
- checkOpen() ;
- if ( Quad.isDefaultGraph(graphName) ) {
- graphName = null ;
- }
-
- if ( !Objects.equals(currentGraph, graphName) ) {
- if ( null != currentGraph ) {
- out.decIndent(BLOCK_INDENT) ;
- out.println("}") ;
- }
-
- if ( null != graphName ) {
- out.print("GRAPH ") ;
- output(graphName) ;
- out.println(" {") ;
- out.incIndent(BLOCK_INDENT) ;
- }
- }
-
- output(triple) ;
- out.println(" .") ;
-
- currentGraph = graphName ;
- }
-
- private void output(Node node) {
- String n = FmtUtils.stringForNode(node, sCxt) ;
- out.print(n) ;
- }
-
- private void output(Triple triple) {
- String ts = FmtUtils.stringForTriple(triple, sCxt) ;
- out.print(ts) ;
- }
-
- @Override
- public void flush() {
- out.flush() ;
- }
-
- @Override
- public void close() {
- if ( opened ) {
- if ( null != currentGraph ) {
- out.decIndent(BLOCK_INDENT) ;
- out.println("}") ;
- }
-
- out.decIndent(BLOCK_INDENT) ;
- out.print("}") ;
-
- // Since we didn't create the OutputStream, we'll just flush it
- flush() ;
- opened = false ;
- }
- }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.riot.out ;
+
+import java.io.OutputStream ;
+import java.util.Objects;
+
+import org.apache.jena.atlas.io.IndentedWriter ;
+import org.apache.jena.atlas.lib.Closeable ;
+import org.apache.jena.atlas.lib.Sink ;
+import org.apache.jena.graph.Node ;
+import org.apache.jena.graph.Triple ;
+import org.apache.jena.sparql.core.Quad ;
+import org.apache.jena.sparql.serializer.SerializationContext ;
+import org.apache.jena.sparql.util.FmtUtils ;
+
+/**
+ * A class that print quads, SPARQL style (maybe good for Trig too?)
+ */
+public class SinkQuadBracedOutput implements Sink<Quad>, Closeable
+{
+ protected static final int BLOCK_INDENT = 2 ;
+
+ protected final IndentedWriter out ;
+ protected final SerializationContext sCxt ;
+ protected boolean opened = false ;
+
+ protected Node currentGraph ;
+
+ public SinkQuadBracedOutput(OutputStream out) {
+ this(out, null) ;
+ }
+
+ public SinkQuadBracedOutput(OutputStream out, SerializationContext sCxt) {
+ this(new IndentedWriter(out), sCxt) ;
+ }
+
+ public SinkQuadBracedOutput(IndentedWriter out, SerializationContext sCxt) {
+ if ( out == null ) { throw new IllegalArgumentException("out may not be null") ; }
+
+ if ( sCxt == null ) {
+ sCxt = new SerializationContext() ;
+ }
+
+ this.out = out ;
+ this.sCxt = sCxt ;
+ }
+
+ public void open() {
+ out.println("{") ;
+ out.incIndent(BLOCK_INDENT) ;
+ opened = true ;
+ }
+
+ private void checkOpen() {
+ if ( !opened ) { throw new IllegalStateException("SinkQuadBracedOutput is not opened. Call open() first.") ; }
+ }
+
+ @Override
+ public void send(Quad quad) {
+ send(quad.getGraph(), quad.asTriple()) ;
+ }
+
+ public void send(Node graphName, Triple triple) {
+ checkOpen() ;
+ if ( Quad.isDefaultGraph(graphName) ) {
+ graphName = null ;
+ }
+
+ if ( !Objects.equals(currentGraph, graphName) ) {
+ if ( null != currentGraph ) {
+ out.decIndent(BLOCK_INDENT) ;
+ out.println("}") ;
+ }
+
+ if ( null != graphName ) {
+ out.print("GRAPH ") ;
+ output(graphName) ;
+ out.println(" {") ;
+ out.incIndent(BLOCK_INDENT) ;
+ }
+ }
+
+ output(triple) ;
+ out.println(" .") ;
+
+ currentGraph = graphName ;
+ }
+
+ private void output(Node node) {
+ String n = FmtUtils.stringForNode(node, sCxt) ;
+ out.print(n) ;
+ }
+
+ private void output(Triple triple) {
+ String ts = FmtUtils.stringForTriple(triple, sCxt) ;
+ out.print(ts) ;
+ }
+
+ @Override
+ public void flush() {
+ out.flush() ;
+ }
+
+ @Override
+ public void close() {
+ if ( opened ) {
+ if ( null != currentGraph ) {
+ out.decIndent(BLOCK_INDENT) ;
+ out.println("}") ;
+ }
+
+ out.decIndent(BLOCK_INDENT) ;
+ out.print("}") ;
+
+ // Since we didn't create the OutputStream, we'll just flush it
+ flush() ;
+ opened = false ;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/jena/blob/3d70d735/jena-arq/src/main/java/org/apache/jena/sparql/engine/index/HashIndexTable.java
----------------------------------------------------------------------
diff --git a/jena-arq/src/main/java/org/apache/jena/sparql/engine/index/HashIndexTable.java b/jena-arq/src/main/java/org/apache/jena/sparql/engine/index/HashIndexTable.java
index 4d0d414..0fb4f6e 100644
--- a/jena-arq/src/main/java/org/apache/jena/sparql/engine/index/HashIndexTable.java
+++ b/jena-arq/src/main/java/org/apache/jena/sparql/engine/index/HashIndexTable.java
@@ -1,218 +1,218 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.jena.sparql.engine.index;
-
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.jena.graph.Node ;
-import org.apache.jena.sparql.core.Var ;
-import org.apache.jena.sparql.engine.QueryIterator ;
-import org.apache.jena.sparql.engine.binding.Binding ;
-
-/**
- * Indexes bindings so that they can be search for quickly when a binding to all the
- * variables is provided. If a binding to only some of the known variables is provided
- * then the index still works, but will search linearly.
- */
-public class HashIndexTable implements IndexTable {
- // Contribution from P Gearon (@quoll)
- final private Set<Key> table ;
- private Map<Var,Integer> varColumns ;
- private boolean missingValue ;
-
- public HashIndexTable(Set<Var> commonVars, QueryIterator data) throws MissingBindingException
- {
- initColumnMappings(commonVars) ;
- if ( commonVars.size() == 0 )
- {
- table = null ;
- return ;
- }
-
- table = new HashSet<>() ;
- missingValue = false ;
-
- while ( data.hasNext() )
- {
- Binding binding = data.nextBinding() ;
- addBindingToTable(binding) ;
- }
- data.close() ;
- }
-
- @Override
- public boolean containsCompatibleWithSharedDomain(Binding binding)
- {
- // no shared variables means no shared domain, and should be ignored
- if ( table == null )
- return false ;
-
- Key indexKey ;
- indexKey = convertToKey(binding) ;
-
- if ( table.contains(indexKey) )
- return true ;
-
- if ( anyUnbound(indexKey) )
- return exhaustiveSearch(indexKey) ;
- return false ;
- }
-
- private boolean anyUnbound(Key mappedBinding)
- {
- for ( Node n: mappedBinding.getNodes() )
- {
- if ( n == null )
- return true ;
- }
- return false ;
- }
-
- private void initColumnMappings(Set<Var> commonVars)
- {
- varColumns = new HashMap<>() ;
- int c = 0 ;
- for ( Var var: commonVars )
- varColumns.put(var, c++) ;
- }
-
- private void addBindingToTable(Binding binding) throws MissingBindingException
- {
- Key key = convertToKey(binding) ;
- table.add(key) ;
- if ( missingValue )
- throw new MissingBindingException(table, varColumns) ;
- }
-
- private Key convertToKey(Binding binding)
- {
- Node[] indexKey = new Node[varColumns.size()] ;
-
- for ( Map.Entry<Var,Integer> varCol : varColumns.entrySet() )
- {
- Node value = binding.get(varCol.getKey()) ;
- if ( value == null )
- missingValue = true ;
- indexKey[varCol.getValue()] = value ;
- }
- return new Key(indexKey) ;
- }
-
- private boolean exhaustiveSearch(Key mappedBindingLeft)
- {
- for ( Key mappedBindingRight: table )
- {
- if ( mappedBindingLeft.compatibleAndSharedDomain(mappedBindingRight) )
- return true ;
- }
- return false ;
- }
-
- static class MissingBindingException extends Exception {
- private final Set<Key> data ;
- private final Map<Var,Integer> varMappings ;
-
- public MissingBindingException(Set<Key> data, Map<Var,Integer> varMappings)
- {
- this.data = data ;
- this.varMappings = varMappings ;
- }
-
- public Set<Key> getData() { return data ; }
- public Map<Var,Integer> getMap() { return varMappings ; }
- }
-
- static class Key
- {
- final Node[] nodes;
-
- Key(Node[] nodes)
- {
- this.nodes = nodes ;
- }
-
- public Node[] getNodes()
- {
- return nodes;
- }
-
- @Override
- public String toString()
- {
- return Arrays.asList(nodes).toString() ;
- }
-
- @Override
- public int hashCode()
- {
- int result = 0 ;
- for ( Node n: nodes )
- result ^= (n == null) ? 0 : n.hashCode() ;
- return result ;
- }
-
- @Override
- public boolean equals(Object o)
- {
- if ( ! (o instanceof Key) )
- return false ;
- Node[] other = ((Key)o).nodes ;
-
- for ( int i = 0 ; i < nodes.length ; i++ )
- {
- if ( nodes[i] == null)
- {
- if ( other[i] != null )
- return false ;
- }
- else
- {
- if ( ! nodes[i].equals(other[i]) )
- return false ;
- }
- }
- return true ;
- }
-
- public boolean compatibleAndSharedDomain(Key mappedBindingR)
- {
- Node[] nodesRight = mappedBindingR.getNodes() ;
-
- boolean sharedDomain = false ;
- for ( int c = 0 ; c < nodes.length ; c++ )
- {
- Node nLeft = nodes[c] ;
- Node nRight = nodesRight[c] ;
-
- if ( nLeft != null && nRight != null )
- {
- if ( nLeft.equals(nRight) )
- return false ;
- sharedDomain = true ;
- }
- }
- return sharedDomain ;
- }
- }
-}
-
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.sparql.engine.index;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.jena.graph.Node ;
+import org.apache.jena.sparql.core.Var ;
+import org.apache.jena.sparql.engine.QueryIterator ;
+import org.apache.jena.sparql.engine.binding.Binding ;
+
+/**
+ * Indexes bindings so that they can be search for quickly when a binding to all the
+ * variables is provided. If a binding to only some of the known variables is provided
+ * then the index still works, but will search linearly.
+ */
+public class HashIndexTable implements IndexTable {
+ // Contribution from P Gearon (@quoll)
+ final private Set<Key> table ;
+ private Map<Var,Integer> varColumns ;
+ private boolean missingValue ;
+
+ public HashIndexTable(Set<Var> commonVars, QueryIterator data) throws MissingBindingException
+ {
+ initColumnMappings(commonVars) ;
+ if ( commonVars.size() == 0 )
+ {
+ table = null ;
+ return ;
+ }
+
+ table = new HashSet<>() ;
+ missingValue = false ;
+
+ while ( data.hasNext() )
+ {
+ Binding binding = data.nextBinding() ;
+ addBindingToTable(binding) ;
+ }
+ data.close() ;
+ }
+
+ @Override
+ public boolean containsCompatibleWithSharedDomain(Binding binding)
+ {
+ // no shared variables means no shared domain, and should be ignored
+ if ( table == null )
+ return false ;
+
+ Key indexKey ;
+ indexKey = convertToKey(binding) ;
+
+ if ( table.contains(indexKey) )
+ return true ;
+
+ if ( anyUnbound(indexKey) )
+ return exhaustiveSearch(indexKey) ;
+ return false ;
+ }
+
+ private boolean anyUnbound(Key mappedBinding)
+ {
+ for ( Node n: mappedBinding.getNodes() )
+ {
+ if ( n == null )
+ return true ;
+ }
+ return false ;
+ }
+
+ private void initColumnMappings(Set<Var> commonVars)
+ {
+ varColumns = new HashMap<>() ;
+ int c = 0 ;
+ for ( Var var: commonVars )
+ varColumns.put(var, c++) ;
+ }
+
+ private void addBindingToTable(Binding binding) throws MissingBindingException
+ {
+ Key key = convertToKey(binding) ;
+ table.add(key) ;
+ if ( missingValue )
+ throw new MissingBindingException(table, varColumns) ;
+ }
+
+ private Key convertToKey(Binding binding)
+ {
+ Node[] indexKey = new Node[varColumns.size()] ;
+
+ for ( Map.Entry<Var,Integer> varCol : varColumns.entrySet() )
+ {
+ Node value = binding.get(varCol.getKey()) ;
+ if ( value == null )
+ missingValue = true ;
+ indexKey[varCol.getValue()] = value ;
+ }
+ return new Key(indexKey) ;
+ }
+
+ private boolean exhaustiveSearch(Key mappedBindingLeft)
+ {
+ for ( Key mappedBindingRight: table )
+ {
+ if ( mappedBindingLeft.compatibleAndSharedDomain(mappedBindingRight) )
+ return true ;
+ }
+ return false ;
+ }
+
+ static class MissingBindingException extends Exception {
+ private final Set<Key> data ;
+ private final Map<Var,Integer> varMappings ;
+
+ public MissingBindingException(Set<Key> data, Map<Var,Integer> varMappings)
+ {
+ this.data = data ;
+ this.varMappings = varMappings ;
+ }
+
+ public Set<Key> getData() { return data ; }
+ public Map<Var,Integer> getMap() { return varMappings ; }
+ }
+
+ static class Key
+ {
+ final Node[] nodes;
+
+ Key(Node[] nodes)
+ {
+ this.nodes = nodes ;
+ }
+
+ public Node[] getNodes()
+ {
+ return nodes;
+ }
+
+ @Override
+ public String toString()
+ {
+ return Arrays.asList(nodes).toString() ;
+ }
+
+ @Override
+ public int hashCode()
+ {
+ int result = 0 ;
+ for ( Node n: nodes )
+ result ^= (n == null) ? 0 : n.hashCode() ;
+ return result ;
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if ( ! (o instanceof Key) )
+ return false ;
+ Node[] other = ((Key)o).nodes ;
+
+ for ( int i = 0 ; i < nodes.length ; i++ )
+ {
+ if ( nodes[i] == null)
+ {
+ if ( other[i] != null )
+ return false ;
+ }
+ else
+ {
+ if ( ! nodes[i].equals(other[i]) )
+ return false ;
+ }
+ }
+ return true ;
+ }
+
+ public boolean compatibleAndSharedDomain(Key mappedBindingR)
+ {
+ Node[] nodesRight = mappedBindingR.getNodes() ;
+
+ boolean sharedDomain = false ;
+ for ( int c = 0 ; c < nodes.length ; c++ )
+ {
+ Node nLeft = nodes[c] ;
+ Node nRight = nodesRight[c] ;
+
+ if ( nLeft != null && nRight != null )
+ {
+ if ( nLeft.equals(nRight) )
+ return false ;
+ sharedDomain = true ;
+ }
+ }
+ return sharedDomain ;
+ }
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/jena/blob/3d70d735/jena-arq/src/main/java/org/apache/jena/sparql/engine/index/IndexFactory.java
----------------------------------------------------------------------
diff --git a/jena-arq/src/main/java/org/apache/jena/sparql/engine/index/IndexFactory.java b/jena-arq/src/main/java/org/apache/jena/sparql/engine/index/IndexFactory.java
index 5828a6b..2593a54 100644
--- a/jena-arq/src/main/java/org/apache/jena/sparql/engine/index/IndexFactory.java
+++ b/jena-arq/src/main/java/org/apache/jena/sparql/engine/index/IndexFactory.java
@@ -1,45 +1,45 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.jena.sparql.engine.index;
-
-import java.util.Set;
-
-import org.apache.jena.sparql.core.Var ;
-import org.apache.jena.sparql.engine.QueryIterator ;
-import org.apache.jena.sparql.engine.index.HashIndexTable.MissingBindingException ;
-import org.apache.jena.sparql.engine.iterator.QueryIterMinus ;
-
-/**
- * Creates {@link IndexTable}s for use by
- * {@link QueryIterMinus}.
- */
-public class IndexFactory {
- // Contribution from P Gearon (@quoll)
- public static IndexTable createIndex(Set<Var> commonVars, QueryIterator data) {
- try {
- if (commonVars.size() == 1) {
- return new SetIndexTable(commonVars, data);
- } else {
- return new HashIndexTable(commonVars, data);
- }
- } catch (MissingBindingException e) {
- return new LinearIndex(commonVars, data, e.getData(), e.getMap());
- }
- }
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.sparql.engine.index;
+
+import java.util.Set;
+
+import org.apache.jena.sparql.core.Var ;
+import org.apache.jena.sparql.engine.QueryIterator ;
+import org.apache.jena.sparql.engine.index.HashIndexTable.MissingBindingException ;
+import org.apache.jena.sparql.engine.iterator.QueryIterMinus ;
+
+/**
+ * Creates {@link IndexTable}s for use by
+ * {@link QueryIterMinus}.
+ */
+public class IndexFactory {
+ // Contribution from P Gearon (@quoll)
+ public static IndexTable createIndex(Set<Var> commonVars, QueryIterator data) {
+ try {
+ if (commonVars.size() == 1) {
+ return new SetIndexTable(commonVars, data);
+ } else {
+ return new HashIndexTable(commonVars, data);
+ }
+ } catch (MissingBindingException e) {
+ return new LinearIndex(commonVars, data, e.getData(), e.getMap());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/jena/blob/3d70d735/jena-arq/src/main/java/org/apache/jena/sparql/engine/index/IndexTable.java
----------------------------------------------------------------------
diff --git a/jena-arq/src/main/java/org/apache/jena/sparql/engine/index/IndexTable.java b/jena-arq/src/main/java/org/apache/jena/sparql/engine/index/IndexTable.java
index 5aa6e8a..9b18f4d 100644
--- a/jena-arq/src/main/java/org/apache/jena/sparql/engine/index/IndexTable.java
+++ b/jena-arq/src/main/java/org/apache/jena/sparql/engine/index/IndexTable.java
@@ -1,32 +1,32 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.jena.sparql.engine.index;
-
-import org.apache.jena.sparql.engine.binding.Binding ;
-
-/**
- * Interface for indexes that are used for identifying matching
- * {@link org.apache.jena.sparql.engine.binding.Binding}s when
- * {@link org.apache.jena.sparql.engine.iterator.QueryIterMinus} is trying to determine
- * which Bindings need to be removed.
- */
-public interface IndexTable {
- // Contribution from P Gearon
- public abstract boolean containsCompatibleWithSharedDomain(Binding binding);
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.sparql.engine.index;
+
+import org.apache.jena.sparql.engine.binding.Binding ;
+
+/**
+ * Interface for indexes that are used for identifying matching
+ * {@link org.apache.jena.sparql.engine.binding.Binding}s when
+ * {@link org.apache.jena.sparql.engine.iterator.QueryIterMinus} is trying to determine
+ * which Bindings need to be removed.
+ */
+public interface IndexTable {
+ // Contribution from P Gearon
+ public abstract boolean containsCompatibleWithSharedDomain(Binding binding);
+}