You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@jena.apache.org by dick-twocows <gi...@git.apache.org> on 2017/04/05 18:54:44 UTC

[GitHub] jena pull request #233: Added mosaic and thrift packages to org.apache.jena....

GitHub user dick-twocows opened a pull request:

    https://github.com/apache/jena/pull/233

    Added mosaic and thrift packages to org.apache.jena.sparql.core.

    Initial pull request of DatasetGraph Mosaic and Thrift implementations.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/dick-twocows/jena master

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/jena/pull/233.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #233
    
----
commit a00b1298dc2790be12f941727eea93a28bcbb63e
Author: Dick Murray <da...@gmail.com>
Date:   2017-04-05T18:52:33Z

    Added mosaic and thrift packages to org.apache.jena.sparql.core.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] jena pull request #233: Added mosaic and thrift packages to org.apache.jena....

Posted by ajs6f <gi...@git.apache.org>.
Github user ajs6f commented on a diff in the pull request:

    https://github.com/apache/jena/pull/233#discussion_r110256573
  
    --- Diff: jena-arq/src/main/java/org/apache/jena/sparql/core/mosaic/DatasetGraphMosaic.java ---
    @@ -0,0 +1,589 @@
    +package org.apache.jena.sparql.core.mosaic;
    +
    +import static org.slf4j.LoggerFactory.getLogger;
    +
    +import java.util.Iterator;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.UUID;
    +import java.util.WeakHashMap;
    +import java.util.concurrent.ConcurrentHashMap;
    +import java.util.concurrent.atomic.AtomicInteger;
    +import java.util.function.Consumer;
    +import java.util.function.Function;
    +import java.util.function.Predicate;
    +import java.util.stream.Collectors;
    +import java.util.stream.Stream;
    +import java.util.stream.StreamSupport;
    +
    +import org.apache.jena.graph.Graph;
    +import org.apache.jena.graph.Node;
    +import org.apache.jena.query.ReadWrite;
    +import org.apache.jena.shared.JenaException;
    +import org.apache.jena.shared.Lock;
    +import org.apache.jena.sparql.core.DatasetGraph;
    +import org.apache.jena.sparql.core.GraphView;
    +import org.apache.jena.sparql.core.Quad;
    +import org.apache.jena.sparql.core.thrift.IteratorCachedArray;
    +import org.apache.jena.sparql.util.Context;
    +import org.apache.jena.sparql.util.Symbol;
    +import org.slf4j.Logger;
    +
    +/**
    + * A DatasetGraph which distributes actions across a number of child DatasetGraph's.
    + * 
    + * As Jena requires Thread affinity when working with transactions this class uses ThreadProxy.
    + * 
    + * Most DatasetGraph methods call into a set of convenience methods which perform common tasks, i.e. mosaicIterator(Function<DatasetGraph, Iterator<T>>).
    + * 
    + * @author dick
    + *
    + */
    +public class DatasetGraphMosaic implements DatasetGraph {
    +	
    +	private static final Logger LOGGER = getLogger(DatasetGraphMosaic.class);
    +
    +	public static final String jenaID = UUID.randomUUID().toString();
    +	
    +	public static final Symbol MOSAIC_STREAM_SEQUENTIAL = Symbol.create(DatasetGraphMosaic.class.getSimpleName() + ".mosaicStreamSequential");
    +
    +	public static final Symbol WRAP_ITERATOR = Symbol.create(DatasetGraphMosaic.class.getSimpleName() + ".wrapIterator");
    +
    +	protected final String id = UUID.randomUUID().toString();
    +	
    +	protected volatile Boolean closed = false;
    +	
    +	protected final Topology topology = new Topology();
    +	
    +	protected final Set<Tessera> mosaic;
    +	
    +	protected final ThreadLocal<TransactionalDistributed> transactional;
    +
    +	protected final WeakHashMap<Thread, TransactionalDistributed> monitor;
    +	
    +	protected final Lock lock;
    +	
    +	protected final DatasetGraphShimWrite shimWrite;
    +	
    +	protected final AtomicInteger readCount = new AtomicInteger();
    +
    +	protected final AtomicInteger writeCount = new AtomicInteger();
    +
    +	protected final AtomicInteger transactionCount = new AtomicInteger();
    +	
    +	protected final Context context;
    +	
    +	public DatasetGraphMosaic() {
    +		super();
    +		
    +		mosaic = ConcurrentHashMap.newKeySet(256);
    +		
    +		transactional = new ThreadLocal<>();
    +		
    +		monitor = new WeakHashMap<>(32);
    +		
    +		lock = new LockMRAndMW();
    +		
    +		shimWrite = new DatasetGraphShimWrite() {
    +			
    +			@Override
    +			public void add(final Node g, final Node s, final Node p, final Node o) {
    --- End diff --
    
    And of course you eventually get into (m children more than n but fewer than l of which must be in this defined group) etc., and I'm not suggesting we get into all that kind of thing right now, just that we should jave a nice place to hook such calculations in.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] jena issue #233: Added mosaic and thrift packages to org.apache.jena.sparql....

Posted by afs <gi...@git.apache.org>.
Github user afs commented on the issue:

    https://github.com/apache/jena/pull/233
  
    What would a good start is an overview of what this covers.
    
    I'm looking for 2 things : what informs the core framework and what are new capabilities for systems/modules.
    
    This started with our discussions about binaries and binary protocols so that would be a good starting place.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] jena issue #233: Added mosaic and thrift packages to org.apache.jena.sparql....

Posted by dick-twocows <gi...@git.apache.org>.
Github user dick-twocows commented on the issue:

    https://github.com/apache/jena/pull/233
  
    Stalled to test 3.3.0 and get a customer release out...
    
    
    Dick
    -------- Original message --------From: "A. Soroka" <no...@github.com> Date: 13/05/2017  13:20  (GMT+00:00) To: apache/jena <je...@noreply.github.com> Cc: Dick Murray <di...@twocows.org>, Mention <me...@noreply.github.com> Subject: Re: [apache/jena] Added mosaic and thrift packages to org.apache.jena.sparql.core. (#233) 
    Okay, cool! Just give a yell when you want more eyes on it. Thanks!
    
    —
    You are receiving this because you were mentioned.
    Reply to this email directly, view it on GitHub, or mute the thread.
    
    
      
      
    
    
    
    
    {"api_version":"1.0","publisher":{"api_key":"05dde50f1d1a384dd78767c55493e4bb","name":"GitHub"},"entity":{"external_key":"github/apache/jena","title":"apache/jena","subtitle":"GitHub repository","main_image_url":"https://cloud.githubusercontent.com/assets/143418/17495839/a5054eac-5d88-11e6-95fc-7290892c7bb5.png","avatar_image_url":"https://cloud.githubusercontent.com/assets/143418/15842166/7c72db34-2c0b-11e6-9aed-b52498112777.png","action":{"name":"Open in GitHub","url":"https://github.com/apache/jena"}},"updates":{"snippets":[{"icon":"PERSON","message":"@ajs6f in #233: Okay, cool! Just give a yell when you want more eyes on it. Thanks!"}],"action":{"name":"View Pull Request","url":"https://github.com/apache/jena/pull/233#issuecomment-301244745"}}}


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] jena pull request #233: Added mosaic and thrift packages to org.apache.jena....

Posted by dick-twocows <gi...@git.apache.org>.
Github user dick-twocows commented on a diff in the pull request:

    https://github.com/apache/jena/pull/233#discussion_r110245986
  
    --- Diff: jena-arq/src/main/java/org/apache/jena/sparql/core/thrift/IteratorCached.java ---
    @@ -0,0 +1,21 @@
    +package org.apache.jena.sparql.core.thrift;
    +
    +import java.util.Iterator;
    +
    +public abstract class IteratorCached<E> implements Iterator<E> {
    --- End diff --
    
    Yes, named more in keeping as a prefix for the actual concrete classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] jena issue #233: Added mosaic and thrift packages to org.apache.jena.sparql....

Posted by ajs6f <gi...@git.apache.org>.
Github user ajs6f commented on the issue:

    https://github.com/apache/jena/pull/233
  
    @afs Okay, now I see what you mean. Yeah, insofar as this gear is trying to "federate" `DatasetGraph`s, it doesn't make sense to penetrate that abstraction to reach `TriTable` and `HexTable`, which are really just implementation constructs for TIM. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] jena issue #233: Added mosaic and thrift packages to org.apache.jena.sparql....

Posted by afs <gi...@git.apache.org>.
Github user afs commented on the issue:

    https://github.com/apache/jena/pull/233
  
    @ajs6f, have you run this?
    
    Andy wrote:
    > I'm looking for 2 things : what informs the core framework and what are new capabilities for systems/modules.
    > This started with our discussions about binaries and binary protocols so that would be a good starting place.
    
    @dick-twocows Sorry I haven't commented on this - it's big. 
    
    I understood this to be for discussion, not contribution directly at the moment. My comments/questions above still stand.
    
    What is "mosaic"? What is "mirage"?
    
    At a guess, "thrift" is thrift machinery, with it's own encoding and the paging.  Is paging necessary? What about buffered streaming?
    
    "mosaic" is the dataset machinery over thrift (but "thrift" has at least one dependency on "mosaic").
    
    What's "mirage"'s role?
    
    "spark" (and the dependencies on spark and Elephas in the POM) is not used.
    There are quite a lot of TODO items :-)
    
    Can we converge on one Thrift/binary protocol? At least reuse the current RDF encoding.
    
    I haven't seen anything yet that needs to be in jena-arq. It would be better to have more modules, not include code in the core (in not the jena-core sense) system. 
    
    For resource and practical reasons, the project can't take on a large, complex system that it can't provide long term support, maintenance and enhancement. Hence what are the requirements for changes to jena-arq etc to support these (it's more than one thing?) either as independent projects, with their own lifecycle, or as separte modules?
    
    General comment : TDB2 has a proper, independent 2-phase commit which will make distributed transactions easier. I'm not seeing a prepare phase in Mosaic.
    
    @ajs6f see `StreamRDFTriHexTable`.  
    
    @dick-twocows  Does it make a difference to stream to the internal data-structures rather than the `DatasetGraph` interface?
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] jena pull request #233: Added mosaic and thrift packages to org.apache.jena....

Posted by ajs6f <gi...@git.apache.org>.
Github user ajs6f commented on a diff in the pull request:

    https://github.com/apache/jena/pull/233#discussion_r110182062
  
    --- Diff: jena-arq/src/main/java/org/apache/jena/sparql/core/mosaic/TransactionalDistributed.java ---
    @@ -0,0 +1,267 @@
    +package org.apache.jena.sparql.core.mosaic;
    +
    +import java.util.LinkedList;
    +import java.util.List;
    +import java.util.Objects;
    +import java.util.Set;
    +import java.util.concurrent.Callable;
    +import java.util.concurrent.ConcurrentHashMap;
    +import java.util.concurrent.ConcurrentMap;
    +import java.util.concurrent.ForkJoinPool;
    +import java.util.concurrent.ForkJoinWorkerThread;
    +import java.util.concurrent.Future;
    +import java.util.concurrent.atomic.AtomicInteger;
    +import java.util.function.Supplier;
    +import java.util.stream.Collectors;
    +
    +import org.apache.jena.query.ReadWrite;
    +import org.apache.jena.shared.JenaException;
    +import org.apache.jena.sparql.core.DatasetGraph;
    +import org.apache.jena.sparql.core.Transactional;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +/**
    + * Class to support a distributed Transactional.
    + * 
    + * Thread affinity is maintained by associating a Thread per DatasetGraph. 
    + * 
    + * A TransactionalDistributed should be declared via a ThreadLocal and thus provides a MRMW.
    + * 
    + * @author dick
    + *
    + */
    +public class TransactionalDistributed implements Transactional {
    +
    +	private static final Logger LOGGER = LoggerFactory.getLogger(TransactionalDistributed.class);
    +	
    +	protected final ForkJoinPool forkJoinPool = new ForkJoinPool(Runtime.getRuntime().availableProcessors());
    +	
    +	protected final String id = Thread.currentThread().toString();
    +
    +	protected final AtomicInteger readCount = new AtomicInteger();
    +	
    +	protected final AtomicInteger writeCount = new AtomicInteger();
    +	
    +	protected volatile ReadWrite readWrite = null;
    +	
    +	protected final Set<DatasetGraph> children = ConcurrentHashMap.newKeySet(32);
    +	
    +	protected final ConcurrentMap<DatasetGraph, ThreadProxy> workers = new ConcurrentHashMap<>(32);
    +
    +	protected class FJWT extends ForkJoinWorkerThread {
    +
    +		public FJWT(final ForkJoinPool pool) {
    +			super(pool);
    +		}
    +		
    +	}
    +	
    +	public TransactionalDistributed() {
    +		super();
    +	}
    +	
    +	/**
    +	 * Execute the given Runnable.
    +	 */
    +	public void execute(final Runnable task) {
    +		try {
    +			forkJoinPool.submit(task).get();
    +		} catch (final Exception exception) {
    +			throw new JenaException(exception);
    +		}
    +	}
    +	
    +	/**
    +	 * Submit the given Callable.
    +	 */
    +	public <T> Future<T> submit(final Callable<T> task) {
    +		try {
    +			return forkJoinPool.submit(task);
    +		} catch (final Exception exception) {
    +			throw new JenaException(exception);
    +		}
    +	}
    +	
    +	/*
    +	 * Distributed transactional methods. The general goal is to maintain transactions on the child DatasetGraph's.  
    +	 */
    +
    +	public ReadWrite getType() {
    +		return readWrite;
    +	}
    +	
    +	public boolean isIn(final ReadWrite compare) {
    +		return Objects.equals(readWrite, Objects.requireNonNull(compare));
    +	}
    +	
    +	protected Set<DatasetGraph> getChildren() {
    +		return children;
    +	}
    +
    +	protected ConcurrentMap<DatasetGraph, ThreadProxy> getWorkers() {
    +		return workers;
    +	}
    +
    +	/**
    +	 * Get the ThreadProxy for the given DatasetGraph.
    +	 */
    +	protected ThreadProxy getWorker(final DatasetGraph datasetGraph) {
    +		return getWorkers().computeIfAbsent(datasetGraph, (dg) -> {return new ThreadProxy();});
    +	}
    +	
    +	/**
    +	 * Submit the given Runnable to the correct ThreadProxy based on the given datasetGraph.
    +	 */
    +	public void execute(final DatasetGraph datasetGraph, final Runnable runnable) {
    +		getWorker(datasetGraph).execute(() -> {
    +			begin(datasetGraph);
    +			runnable.run();
    +		});
    +	}
    +	
    +	/**
    +	 * Submit the given Supplier to the correct ThreadProxy based on the given datasetGraph.
    +	 */
    +	public <T> Future<T> submit(final DatasetGraph datasetGraph, final Supplier<T> supplier) {
    +		return getWorker(datasetGraph).submit(() -> {
    +			begin(datasetGraph);
    +			return supplier.get();
    +		});
    +	}
    +	
    +	/**
    +	 * Begin a transaction on the given DatasetGraph.
    +	 */
    +	protected DatasetGraph begin(final DatasetGraph datasetGraph) {
    +		if (!isInTransaction()) {
    +			throw new JenaException("No parent transaction");
    +		}
    +		try {
    +			if (!children.contains(datasetGraph)) {
    +				if (datasetGraph.supportsTransactions()) {
    +					datasetGraph.begin(readWrite);
    +				}
    +				children.add(datasetGraph);
    +			}
    +			return datasetGraph;
    +		} catch (final Exception exception) {
    +			throw new JenaException(exception);
    +		}
    +	}
    +	
    +	/*
    +	 * Transactional
    +	 */
    +	
    +	@Override
    +	public void begin(final ReadWrite readWrite) {
    +		if (this.readWrite != null) {
    +			throw new JenaException("Already in a transaction " + this.readWrite);
    +		}
    +		this.readWrite = readWrite;
    +	}
    +	
    +	@Override
    +	public void commit() {
    +		List<Exception> threw = new LinkedList<>();
    +		children
    +			.forEach(datasetGraph -> {
    +				try {
    +					getWorker(datasetGraph).execute(() -> {
    +						if (datasetGraph.supportsTransactions()) {
    +							datasetGraph.commit();
    +						}
    +					});
    +				} catch (final Exception exception) {
    +					threw.add(exception);
    +				}
    +			});
    +		this.children.clear();
    +		this.readWrite = null;
    +		if (!threw.isEmpty()) {
    +			final JenaException jenaException = new JenaException();
    +			threw.forEach((exception) -> {jenaException.addSuppressed(exception);});
    +			throw jenaException;
    +		}
    +	}
    +	
    +	@Override
    +	public void abort() {
    +		List<Exception> threw = new LinkedList<>();
    +		children
    +			.forEach(datasetGraph -> {
    +				try {
    +					getWorker(datasetGraph).execute(() -> {
    +						if (datasetGraph.supportsTransactions() && datasetGraph.supportsTransactionAbort()) {
    +							datasetGraph.abort();
    +						} else {
    +							LOGGER.warn("Attempt to call Transactional.abort().", datasetGraph.getClass());
    +						}
    +					});
    +				} catch (final Exception exception) {
    +					threw.add(exception);
    +				}
    +			});
    +		this.children.clear();
    +		this.readWrite = null;
    +		if (!threw.isEmpty()) {
    +			final JenaException jenaException = new JenaException();
    +			threw.forEach((exception) -> {jenaException.addSuppressed(exception);});
    +			throw jenaException;
    +		}
    +	}
    +	
    +	@Override
    +	public void end() {
    +		if (readWrite != null && readWrite.equals(ReadWrite.WRITE)) {
    +			LOGGER.warn("End without commit/abort");
    +		}
    +		List<Exception> threw = new LinkedList<>();
    +		children
    +			.forEach(datasetGraph -> {
    +				try {
    +					getWorker(datasetGraph).execute(() -> {
    +						if (datasetGraph.supportsTransactions()) {
    +							datasetGraph.end();
    +						}
    +					});
    +				} catch (final Exception exception) {
    +					threw.add(exception);
    +				}
    +			});
    +		this.children.clear();
    +		this.workers.values().forEach(threadDelegate -> {
    --- End diff --
    
    Just a style thing, but this could be point-free `this.workers.values().forEach(ThreadProxy::close)`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] jena issue #233: Added mosaic and thrift packages to org.apache.jena.sparql....

Posted by ajs6f <gi...@git.apache.org>.
Github user ajs6f commented on the issue:

    https://github.com/apache/jena/pull/233
  
    There is a lot of cool functionality here, but I don't see any tests at all. Are they coming in a separate PR?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] jena issue #233: Added mosaic and thrift packages to org.apache.jena.sparql....

Posted by ajs6f <gi...@git.apache.org>.
Github user ajs6f commented on the issue:

    https://github.com/apache/jena/pull/233
  
    Ah, okay, cool. (Re: Javadocs)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] jena pull request #233: Added mosaic and thrift packages to org.apache.jena....

Posted by dick-twocows <gi...@git.apache.org>.
Github user dick-twocows commented on a diff in the pull request:

    https://github.com/apache/jena/pull/233#discussion_r110245267
  
    --- Diff: jena-arq/src/main/java/org/apache/jena/sparql/core/mosaic/TransactionalDistributed.java ---
    @@ -0,0 +1,267 @@
    +package org.apache.jena.sparql.core.mosaic;
    +
    +import java.util.LinkedList;
    +import java.util.List;
    +import java.util.Objects;
    +import java.util.Set;
    +import java.util.concurrent.Callable;
    +import java.util.concurrent.ConcurrentHashMap;
    +import java.util.concurrent.ConcurrentMap;
    +import java.util.concurrent.ForkJoinPool;
    +import java.util.concurrent.ForkJoinWorkerThread;
    +import java.util.concurrent.Future;
    +import java.util.concurrent.atomic.AtomicInteger;
    +import java.util.function.Supplier;
    +import java.util.stream.Collectors;
    +
    +import org.apache.jena.query.ReadWrite;
    +import org.apache.jena.shared.JenaException;
    +import org.apache.jena.sparql.core.DatasetGraph;
    +import org.apache.jena.sparql.core.Transactional;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +/**
    + * Class to support a distributed Transactional.
    + * 
    + * Thread affinity is maintained by associating a Thread per DatasetGraph. 
    + * 
    + * A TransactionalDistributed should be declared via a ThreadLocal and thus provides a MRMW.
    + * 
    + * @author dick
    + *
    + */
    +public class TransactionalDistributed implements Transactional {
    +
    +	private static final Logger LOGGER = LoggerFactory.getLogger(TransactionalDistributed.class);
    +	
    +	protected final ForkJoinPool forkJoinPool = new ForkJoinPool(Runtime.getRuntime().availableProcessors());
    +	
    +	protected final String id = Thread.currentThread().toString();
    +
    +	protected final AtomicInteger readCount = new AtomicInteger();
    +	
    +	protected final AtomicInteger writeCount = new AtomicInteger();
    +	
    +	protected volatile ReadWrite readWrite = null;
    +	
    +	protected final Set<DatasetGraph> children = ConcurrentHashMap.newKeySet(32);
    +	
    +	protected final ConcurrentMap<DatasetGraph, ThreadProxy> workers = new ConcurrentHashMap<>(32);
    +
    +	protected class FJWT extends ForkJoinWorkerThread {
    +
    +		public FJWT(final ForkJoinPool pool) {
    +			super(pool);
    +		}
    +		
    +	}
    +	
    +	public TransactionalDistributed() {
    +		super();
    +	}
    +	
    +	/**
    +	 * Execute the given Runnable.
    +	 */
    +	public void execute(final Runnable task) {
    +		try {
    +			forkJoinPool.submit(task).get();
    +		} catch (final Exception exception) {
    +			throw new JenaException(exception);
    +		}
    +	}
    +	
    +	/**
    +	 * Submit the given Callable.
    +	 */
    +	public <T> Future<T> submit(final Callable<T> task) {
    +		try {
    +			return forkJoinPool.submit(task);
    +		} catch (final Exception exception) {
    +			throw new JenaException(exception);
    +		}
    +	}
    +	
    +	/*
    +	 * Distributed transactional methods. The general goal is to maintain transactions on the child DatasetGraph's.  
    +	 */
    +
    +	public ReadWrite getType() {
    +		return readWrite;
    +	}
    +	
    +	public boolean isIn(final ReadWrite compare) {
    +		return Objects.equals(readWrite, Objects.requireNonNull(compare));
    +	}
    +	
    +	protected Set<DatasetGraph> getChildren() {
    +		return children;
    +	}
    +
    +	protected ConcurrentMap<DatasetGraph, ThreadProxy> getWorkers() {
    +		return workers;
    +	}
    +
    +	/**
    +	 * Get the ThreadProxy for the given DatasetGraph.
    +	 */
    +	protected ThreadProxy getWorker(final DatasetGraph datasetGraph) {
    +		return getWorkers().computeIfAbsent(datasetGraph, (dg) -> {return new ThreadProxy();});
    +	}
    +	
    +	/**
    +	 * Submit the given Runnable to the correct ThreadProxy based on the given datasetGraph.
    +	 */
    +	public void execute(final DatasetGraph datasetGraph, final Runnable runnable) {
    +		getWorker(datasetGraph).execute(() -> {
    +			begin(datasetGraph);
    +			runnable.run();
    +		});
    +	}
    +	
    +	/**
    +	 * Submit the given Supplier to the correct ThreadProxy based on the given datasetGraph.
    +	 */
    +	public <T> Future<T> submit(final DatasetGraph datasetGraph, final Supplier<T> supplier) {
    +		return getWorker(datasetGraph).submit(() -> {
    +			begin(datasetGraph);
    +			return supplier.get();
    +		});
    +	}
    +	
    +	/**
    +	 * Begin a transaction on the given DatasetGraph.
    +	 */
    +	protected DatasetGraph begin(final DatasetGraph datasetGraph) {
    +		if (!isInTransaction()) {
    +			throw new JenaException("No parent transaction");
    +		}
    +		try {
    +			if (!children.contains(datasetGraph)) {
    +				if (datasetGraph.supportsTransactions()) {
    +					datasetGraph.begin(readWrite);
    +				}
    +				children.add(datasetGraph);
    +			}
    +			return datasetGraph;
    +		} catch (final Exception exception) {
    +			throw new JenaException(exception);
    +		}
    +	}
    +	
    +	/*
    +	 * Transactional
    +	 */
    +	
    +	@Override
    +	public void begin(final ReadWrite readWrite) {
    +		if (this.readWrite != null) {
    +			throw new JenaException("Already in a transaction " + this.readWrite);
    +		}
    +		this.readWrite = readWrite;
    +	}
    +	
    +	@Override
    +	public void commit() {
    +		List<Exception> threw = new LinkedList<>();
    +		children
    +			.forEach(datasetGraph -> {
    +				try {
    +					getWorker(datasetGraph).execute(() -> {
    +						if (datasetGraph.supportsTransactions()) {
    +							datasetGraph.commit();
    +						}
    +					});
    +				} catch (final Exception exception) {
    +					threw.add(exception);
    +				}
    +			});
    +		this.children.clear();
    +		this.readWrite = null;
    +		if (!threw.isEmpty()) {
    +			final JenaException jenaException = new JenaException();
    +			threw.forEach((exception) -> {jenaException.addSuppressed(exception);});
    +			throw jenaException;
    +		}
    +	}
    +	
    +	@Override
    +	public void abort() {
    +		List<Exception> threw = new LinkedList<>();
    +		children
    +			.forEach(datasetGraph -> {
    +				try {
    +					getWorker(datasetGraph).execute(() -> {
    +						if (datasetGraph.supportsTransactions() && datasetGraph.supportsTransactionAbort()) {
    +							datasetGraph.abort();
    +						} else {
    +							LOGGER.warn("Attempt to call Transactional.abort().", datasetGraph.getClass());
    +						}
    +					});
    +				} catch (final Exception exception) {
    +					threw.add(exception);
    +				}
    +			});
    +		this.children.clear();
    +		this.readWrite = null;
    +		if (!threw.isEmpty()) {
    +			final JenaException jenaException = new JenaException();
    +			threw.forEach((exception) -> {jenaException.addSuppressed(exception);});
    +			throw jenaException;
    +		}
    +	}
    +	
    +	@Override
    +	public void end() {
    +		if (readWrite != null && readWrite.equals(ReadWrite.WRITE)) {
    +			LOGGER.warn("End without commit/abort");
    +		}
    +		List<Exception> threw = new LinkedList<>();
    +		children
    +			.forEach(datasetGraph -> {
    +				try {
    +					getWorker(datasetGraph).execute(() -> {
    +						if (datasetGraph.supportsTransactions()) {
    +							datasetGraph.end();
    +						}
    +					});
    +				} catch (final Exception exception) {
    +					threw.add(exception);
    +				}
    +			});
    +		this.children.clear();
    +		this.workers.values().forEach(threadDelegate -> {
    --- End diff --
    
    There'll be more of these ;-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] jena issue #233: Added mosaic and thrift packages to org.apache.jena.sparql....

Posted by dick-twocows <gi...@git.apache.org>.
Github user dick-twocows commented on the issue:

    https://github.com/apache/jena/pull/233
  
    Here's for starters.
    DatasetGaphThrift is a thrift based wrapper for an existing DatasetGaph. It includes a Java IFace and Server implementation and instructions on how to use another language to access the wrapped DSG. This allows a DSG to be accessed in another JVM. This is binary part.
    DatasetGaphMosaic wraps multiple DSG and presents them as one DSG. Useful when the wrapped DSG is a DSG thrift. It will apply optimistic locking and wraps the transaction with a try mechanism. Effectively MRMW with no contention unless writes are to the same DSG. This is clustering part.
    DatasetGaphMirage uses rules to auto magic quads based on the find methods. An example Ray is a folder which is mapped from between a urn: and file: schema and has the ability to load triples and quads to fulfill the find methods calls. This is dynamic part.
    Thrift I think is a candidate for core. Mosaic and Mirage I'm thinking more of an associated pom...
    Dick
    -------- Original message --------From: Andy Seaborne <no...@github.com> Date: 13/05/2017  19:19  (GMT+00:00) To: apache/jena <je...@noreply.github.com> Cc: Dick Murray <di...@twocows.org>, Mention <me...@noreply.github.com> Subject: Re: [apache/jena] Added mosaic and thrift packages to org.apache.jena.sparql.core. (#233) 
    What would a good start is an overview of what this covers.
    I'm looking for 2 things : what informs the core framework and what are new capabilities for systems/modules.
    This started with our discussions about binaries and binary protocols so that would be a good starting place.
    
    —
    You are receiving this because you were mentioned.
    Reply to this email directly, view it on GitHub, or mute the thread.
    
    
      
      
    
    
    
    
    {"api_version":"1.0","publisher":{"api_key":"05dde50f1d1a384dd78767c55493e4bb","name":"GitHub"},"entity":{"external_key":"github/apache/jena","title":"apache/jena","subtitle":"GitHub repository","main_image_url":"https://cloud.githubusercontent.com/assets/143418/17495839/a5054eac-5d88-11e6-95fc-7290892c7bb5.png","avatar_image_url":"https://cloud.githubusercontent.com/assets/143418/15842166/7c72db34-2c0b-11e6-9aed-b52498112777.png","action":{"name":"Open in GitHub","url":"https://github.com/apache/jena"}},"updates":{"snippets":[{"icon":"PERSON","message":"@afs in #233: What would a good start is an overview of what this covers.\r\n\r\nI'm looking for 2 things : what informs the core framework and what are new capabilities for systems/modules.\r\n\r\nThis started with our discussions about binaries and binary protocols so that would be a good starting place.\r\n"}],"action":{"name":"View Pull Request","url":"https://github.com/apache/jena/pull/233#issuecomment-301265464"}}}


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] jena issue #233: Added mosaic and thrift packages to org.apache.jena.sparql....

Posted by ajs6f <gi...@git.apache.org>.
Github user ajs6f commented on the issue:

    https://github.com/apache/jena/pull/233
  
    @dick-twocows , am I right in thinking that the assumption you are making about the "mosaic pieces", the child-datasetgraphs, is that they are MR+SW? Or are you somehow being agnostic to that?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] jena pull request #233: Added mosaic and thrift packages to org.apache.jena....

Posted by dick-twocows <gi...@git.apache.org>.
Github user dick-twocows commented on a diff in the pull request:

    https://github.com/apache/jena/pull/233#discussion_r110245621
  
    --- Diff: jena-arq/src/main/java/org/apache/jena/sparql/core/mosaic/TransactionalDistributed.java ---
    @@ -0,0 +1,267 @@
    +package org.apache.jena.sparql.core.mosaic;
    +
    +import java.util.LinkedList;
    +import java.util.List;
    +import java.util.Objects;
    +import java.util.Set;
    +import java.util.concurrent.Callable;
    +import java.util.concurrent.ConcurrentHashMap;
    +import java.util.concurrent.ConcurrentMap;
    +import java.util.concurrent.ForkJoinPool;
    +import java.util.concurrent.ForkJoinWorkerThread;
    +import java.util.concurrent.Future;
    +import java.util.concurrent.atomic.AtomicInteger;
    +import java.util.function.Supplier;
    +import java.util.stream.Collectors;
    +
    +import org.apache.jena.query.ReadWrite;
    +import org.apache.jena.shared.JenaException;
    +import org.apache.jena.sparql.core.DatasetGraph;
    +import org.apache.jena.sparql.core.Transactional;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +/**
    + * Class to support a distributed Transactional.
    + * 
    + * Thread affinity is maintained by associating a Thread per DatasetGraph. 
    + * 
    + * A TransactionalDistributed should be declared via a ThreadLocal and thus provides a MRMW.
    + * 
    + * @author dick
    + *
    + */
    +public class TransactionalDistributed implements Transactional {
    +
    +	private static final Logger LOGGER = LoggerFactory.getLogger(TransactionalDistributed.class);
    +	
    +	protected final ForkJoinPool forkJoinPool = new ForkJoinPool(Runtime.getRuntime().availableProcessors());
    +	
    +	protected final String id = Thread.currentThread().toString();
    +
    +	protected final AtomicInteger readCount = new AtomicInteger();
    +	
    +	protected final AtomicInteger writeCount = new AtomicInteger();
    +	
    +	protected volatile ReadWrite readWrite = null;
    +	
    +	protected final Set<DatasetGraph> children = ConcurrentHashMap.newKeySet(32);
    +	
    +	protected final ConcurrentMap<DatasetGraph, ThreadProxy> workers = new ConcurrentHashMap<>(32);
    +
    +	protected class FJWT extends ForkJoinWorkerThread {
    +
    +		public FJWT(final ForkJoinPool pool) {
    +			super(pool);
    +		}
    +		
    +	}
    +	
    +	public TransactionalDistributed() {
    +		super();
    +	}
    +	
    +	/**
    +	 * Execute the given Runnable.
    +	 */
    +	public void execute(final Runnable task) {
    +		try {
    +			forkJoinPool.submit(task).get();
    +		} catch (final Exception exception) {
    +			throw new JenaException(exception);
    +		}
    +	}
    +	
    +	/**
    +	 * Submit the given Callable.
    +	 */
    +	public <T> Future<T> submit(final Callable<T> task) {
    +		try {
    +			return forkJoinPool.submit(task);
    +		} catch (final Exception exception) {
    +			throw new JenaException(exception);
    +		}
    +	}
    +	
    +	/*
    +	 * Distributed transactional methods. The general goal is to maintain transactions on the child DatasetGraph's.  
    +	 */
    +
    +	public ReadWrite getType() {
    +		return readWrite;
    +	}
    +	
    +	public boolean isIn(final ReadWrite compare) {
    +		return Objects.equals(readWrite, Objects.requireNonNull(compare));
    +	}
    +	
    +	protected Set<DatasetGraph> getChildren() {
    +		return children;
    +	}
    +
    +	protected ConcurrentMap<DatasetGraph, ThreadProxy> getWorkers() {
    +		return workers;
    +	}
    +
    +	/**
    +	 * Get the ThreadProxy for the given DatasetGraph.
    +	 */
    +	protected ThreadProxy getWorker(final DatasetGraph datasetGraph) {
    +		return getWorkers().computeIfAbsent(datasetGraph, (dg) -> {return new ThreadProxy();});
    +	}
    +	
    +	/**
    +	 * Submit the given Runnable to the correct ThreadProxy based on the given datasetGraph.
    +	 */
    +	public void execute(final DatasetGraph datasetGraph, final Runnable runnable) {
    +		getWorker(datasetGraph).execute(() -> {
    +			begin(datasetGraph);
    +			runnable.run();
    +		});
    +	}
    +	
    +	/**
    +	 * Submit the given Supplier to the correct ThreadProxy based on the given datasetGraph.
    +	 */
    +	public <T> Future<T> submit(final DatasetGraph datasetGraph, final Supplier<T> supplier) {
    +		return getWorker(datasetGraph).submit(() -> {
    +			begin(datasetGraph);
    +			return supplier.get();
    +		});
    +	}
    +	
    +	/**
    +	 * Begin a transaction on the given DatasetGraph.
    +	 */
    +	protected DatasetGraph begin(final DatasetGraph datasetGraph) {
    +		if (!isInTransaction()) {
    +			throw new JenaException("No parent transaction");
    +		}
    +		try {
    +			if (!children.contains(datasetGraph)) {
    +				if (datasetGraph.supportsTransactions()) {
    +					datasetGraph.begin(readWrite);
    +				}
    +				children.add(datasetGraph);
    +			}
    +			return datasetGraph;
    +		} catch (final Exception exception) {
    +			throw new JenaException(exception);
    +		}
    +	}
    +	
    +	/*
    +	 * Transactional
    +	 */
    +	
    +	@Override
    +	public void begin(final ReadWrite readWrite) {
    +		if (this.readWrite != null) {
    +			throw new JenaException("Already in a transaction " + this.readWrite);
    +		}
    +		this.readWrite = readWrite;
    +	}
    +	
    +	@Override
    +	public void commit() {
    +		List<Exception> threw = new LinkedList<>();
    +		children
    +			.forEach(datasetGraph -> {
    +				try {
    +					getWorker(datasetGraph).execute(() -> {
    +						if (datasetGraph.supportsTransactions()) {
    +							datasetGraph.commit();
    +						}
    +					});
    +				} catch (final Exception exception) {
    +					threw.add(exception);
    +				}
    +			});
    +		this.children.clear();
    +		this.readWrite = null;
    +		if (!threw.isEmpty()) {
    +			final JenaException jenaException = new JenaException();
    +			threw.forEach((exception) -> {jenaException.addSuppressed(exception);});
    +			throw jenaException;
    +		}
    +	}
    +	
    +	@Override
    +	public void abort() {
    +		List<Exception> threw = new LinkedList<>();
    +		children
    +			.forEach(datasetGraph -> {
    +				try {
    +					getWorker(datasetGraph).execute(() -> {
    +						if (datasetGraph.supportsTransactions() && datasetGraph.supportsTransactionAbort()) {
    +							datasetGraph.abort();
    +						} else {
    +							LOGGER.warn("Attempt to call Transactional.abort().", datasetGraph.getClass());
    +						}
    +					});
    +				} catch (final Exception exception) {
    +					threw.add(exception);
    +				}
    +			});
    +		this.children.clear();
    +		this.readWrite = null;
    +		if (!threw.isEmpty()) {
    +			final JenaException jenaException = new JenaException();
    +			threw.forEach((exception) -> {jenaException.addSuppressed(exception);});
    +			throw jenaException;
    +		}
    +	}
    +	
    +	@Override
    +	public void end() {
    +		if (readWrite != null && readWrite.equals(ReadWrite.WRITE)) {
    +			LOGGER.warn("End without commit/abort");
    +		}
    +		List<Exception> threw = new LinkedList<>();
    +		children
    +			.forEach(datasetGraph -> {
    +				try {
    +					getWorker(datasetGraph).execute(() -> {
    +						if (datasetGraph.supportsTransactions()) {
    +							datasetGraph.end();
    +						}
    +					});
    +				} catch (final Exception exception) {
    +					threw.add(exception);
    +				}
    +			});
    +		this.children.clear();
    +		this.workers.values().forEach(threadDelegate -> {
    +			threadDelegate.close();
    +		});
    +		this.workers.clear();
    +		this.readWrite = null;
    +		if (!threw.isEmpty()) {
    +			final JenaException jenaException = new JenaException();
    +			threw.forEach((exception) -> {jenaException.addSuppressed(exception);});
    --- End diff --
    
    There's the same code related too, so it needs pulling out into a common method!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] jena issue #233: Added mosaic and thrift packages to org.apache.jena.sparql....

Posted by dick-twocows <gi...@git.apache.org>.
Github user dick-twocows commented on the issue:

    https://github.com/apache/jena/pull/233
  
    Re mosaic pieces, there is a dataset graph try which wraps a dataset with the appropriate transactional try based on what concrete implementation it is, hence previous comment about cross referencing TDB from ARQ, so is currently commented, by default it assumes MR+SW. If it's out of sync it'll block when it came through to the wrapped dataset graph.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] jena issue #233: Added mosaic and thrift packages to org.apache.jena.sparql....

Posted by dick-twocows <gi...@git.apache.org>.
Github user dick-twocows commented on the issue:

    https://github.com/apache/jena/pull/233
  
    I'll try to keep this brief. Two main parts to the PR, Mosaic and Thrift.
    
    Mosaic provides a way to aggregate multiple dataset graphs as one using optimistic transactional. All threads will acquire a read or write, with transactions beginning only when a child is touched. This allows indirect multiple writes and the more parallel write you need just load more children.
    
    Thrift provides access to a dataset graph running in another JVM with full transaction support. Simplistically \u200bthe Thrift service mimics the dataset graph interface with the addition of an ID to most calls. The ID allows the Thrift server to route the Thrift thread pool threads to the correct thread proxy to maintain thread affinity for remote transaction purposes. The iterator cache classes specifically the paged based ones cache the quads or triples to reduce round trips over the network.
    
    If you add Thrift dataset graphs to the Mosaic you get a clustered dataset graph, i.e. multiple TDBs each running in a separate JVM which can be accessed using the standard Jena API.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] jena issue #233: Added mosaic and thrift packages to org.apache.jena.sparql....

Posted by dick-twocows <gi...@git.apache.org>.
Github user dick-twocows commented on the issue:

    https://github.com/apache/jena/pull/233
  
    Well that confirmed the PR worked :-)
    
    Thanks for the pointer and I'm sure there are a few other places where this
    occurs.
    
    That class has an issue as before being ported into the Jena fork it
    referenced jena-tdb for the two commented class definitions. Those i can
    replace with a FQCN but the cast to getWapped() has got me stumped! It
    doesn't stop it working but it needs a fix.
    
    There's a thread on the Jena mail which mentions (to Andy) this was a test
    PR and it needs the Thrift file compiled...
    
    
    On 5 Apr 2017 8:57 pm, "A. Soroka" <no...@github.com> wrote:
    
    *@ajs6f* commented on this pull request.
    ------------------------------
    
    In jena-arq/src/main/java/org/apache/jena/sparql/core/mosaic/Topology.java
    <https://github.com/apache/jena/pull/233#discussion_r110013073>:
    
    > +import org.apache.jena.sparql.util.Symbol;
    +
    +public class Topology {
    +
    +	protected static final Map<Symbol, Metric> ENTRIES;
    +	
    +	public static Symbol keyFor(final Class<? extends DatasetGraph> c) {
    +		return Symbol.create(c.getName());
    +	}
    +	
    +	static {
    +		final Map<Symbol, Metric> entries = new HashMap<>();
    +		
    +		entries.put(keyFor(DatasetGraph.class), new Metric());
    +		
    +		entries.put(keyFor(DatasetGraphInMemory.class), new Metric());
    
    Just a style thing, but
    org.apache.jena.ext.com.google.common.collect.ImmutableMap.of(K,
    V, K, V) would let you build this right in the field declaration, no static
    stanza needed.
    
    \u2014
    You are receiving this because you authored the thread.
    Reply to this email directly, view it on GitHub
    <https://github.com/apache/jena/pull/233#pullrequestreview-31153874>, or mute
    the thread
    <https://github.com/notifications/unsubscribe-auth/ARAbAv8iwJSxQ-LIp3hTFQi3yl2KO46Dks5rs_JAgaJpZM4M0qnn>
    .



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] jena issue #233: Added mosaic and thrift packages to org.apache.jena.sparql....

Posted by dick-twocows <gi...@git.apache.org>.
Github user dick-twocows commented on the issue:

    https://github.com/apache/jena/pull/233
  
    All the code if accepted needs to be properly annotated, I've removed the
    original project specific annotations...
    
    On 6 Apr 2017 3:39 pm, "A. Soroka" <no...@github.com> wrote:
    
    > *@ajs6f* commented on this pull request.
    > ------------------------------
    >
    > In jena-arq/src/main/java/org/apache/jena/sparql/core/mosaic/LockTry.java
    > <https://github.com/apache/jena/pull/233#discussion_r110179015>:
    >
    > > @@ -0,0 +1,8 @@
    > +package org.apache.jena.sparql.core.mosaic;
    > +
    > +import org.apache.jena.shared.Lock;
    > +
    > +public interface LockTry extends Lock {
    > +
    > +	boolean tryEnterCriticalSection(boolean readLockRequested);
    >
    > It might be nice to put a Javadoc comment on this to make the required
    > semantics precisely known. They seem clear now, but as Jena's concurrency
    > evolves, they might get less so.
    >
    > \u2014
    > You are receiving this because you authored the thread.
    > Reply to this email directly, view it on GitHub
    > <https://github.com/apache/jena/pull/233#pullrequestreview-31332234>, or mute
    > the thread
    > <https://github.com/notifications/unsubscribe-auth/ARAbAgpmThd8liNYmiMNV0JEf0xHJIeTks5rtPkCgaJpZM4M0qnn>
    > .
    >



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] jena issue #233: Added mosaic and thrift packages to org.apache.jena.sparql....

Posted by dick-twocows <gi...@git.apache.org>.
Github user dick-twocows commented on the issue:

    https://github.com/apache/jena/pull/233
  
    The ability to try and begin a write transaction exists because;
    
    We needed to run parallel loads i.e. try each child to write and the first one to return true performs the load, need more parallel spin up more tin/JVMs.
    
    The ability to provide optimistic access, i.e. try to read, if not skip, no problem we're open world...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] jena pull request #233: Added mosaic and thrift packages to org.apache.jena....

Posted by ajs6f <gi...@git.apache.org>.
Github user ajs6f commented on a diff in the pull request:

    https://github.com/apache/jena/pull/233#discussion_r110254074
  
    --- Diff: jena-arq/src/main/java/org/apache/jena/sparql/core/mosaic/DatasetGraphMosaic.java ---
    @@ -0,0 +1,589 @@
    +package org.apache.jena.sparql.core.mosaic;
    +
    +import static org.slf4j.LoggerFactory.getLogger;
    +
    +import java.util.Iterator;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.UUID;
    +import java.util.WeakHashMap;
    +import java.util.concurrent.ConcurrentHashMap;
    +import java.util.concurrent.atomic.AtomicInteger;
    +import java.util.function.Consumer;
    +import java.util.function.Function;
    +import java.util.function.Predicate;
    +import java.util.stream.Collectors;
    +import java.util.stream.Stream;
    +import java.util.stream.StreamSupport;
    +
    +import org.apache.jena.graph.Graph;
    +import org.apache.jena.graph.Node;
    +import org.apache.jena.query.ReadWrite;
    +import org.apache.jena.shared.JenaException;
    +import org.apache.jena.shared.Lock;
    +import org.apache.jena.sparql.core.DatasetGraph;
    +import org.apache.jena.sparql.core.GraphView;
    +import org.apache.jena.sparql.core.Quad;
    +import org.apache.jena.sparql.core.thrift.IteratorCachedArray;
    +import org.apache.jena.sparql.util.Context;
    +import org.apache.jena.sparql.util.Symbol;
    +import org.slf4j.Logger;
    +
    +/**
    + * A DatasetGraph which distributes actions across a number of child DatasetGraph's.
    + * 
    + * As Jena requires Thread affinity when working with transactions this class uses ThreadProxy.
    + * 
    + * Most DatasetGraph methods call into a set of convenience methods which perform common tasks, i.e. mosaicIterator(Function<DatasetGraph, Iterator<T>>).
    + * 
    + * @author dick
    + *
    + */
    +public class DatasetGraphMosaic implements DatasetGraph {
    +	
    +	private static final Logger LOGGER = getLogger(DatasetGraphMosaic.class);
    +
    +	public static final String jenaID = UUID.randomUUID().toString();
    +	
    +	public static final Symbol MOSAIC_STREAM_SEQUENTIAL = Symbol.create(DatasetGraphMosaic.class.getSimpleName() + ".mosaicStreamSequential");
    +
    +	public static final Symbol WRAP_ITERATOR = Symbol.create(DatasetGraphMosaic.class.getSimpleName() + ".wrapIterator");
    +
    +	protected final String id = UUID.randomUUID().toString();
    +	
    +	protected volatile Boolean closed = false;
    +	
    +	protected final Topology topology = new Topology();
    +	
    +	protected final Set<Tessera> mosaic;
    +	
    +	protected final ThreadLocal<TransactionalDistributed> transactional;
    +
    +	protected final WeakHashMap<Thread, TransactionalDistributed> monitor;
    +	
    +	protected final Lock lock;
    +	
    +	protected final DatasetGraphShimWrite shimWrite;
    +	
    +	protected final AtomicInteger readCount = new AtomicInteger();
    +
    +	protected final AtomicInteger writeCount = new AtomicInteger();
    +
    +	protected final AtomicInteger transactionCount = new AtomicInteger();
    +	
    +	protected final Context context;
    +	
    +	public DatasetGraphMosaic() {
    +		super();
    +		
    +		mosaic = ConcurrentHashMap.newKeySet(256);
    +		
    +		transactional = new ThreadLocal<>();
    +		
    +		monitor = new WeakHashMap<>(32);
    +		
    +		lock = new LockMRAndMW();
    +		
    +		shimWrite = new DatasetGraphShimWrite() {
    +			
    +			@Override
    +			public void add(final Node g, final Node s, final Node p, final Node o) {
    --- End diff --
    
    Ah, okay, cool, that was one of my biggest questions (sharding policy). I'm glad to see you've got an extension point ready, nicely done. My immediate sense would be that we want a policy injection point near here, into which you could put a type that takes a quad and checks it against some record of the available "children" and makes that decision. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] jena issue #233: Added mosaic and thrift packages to org.apache.jena.sparql....

Posted by dick-twocows <gi...@git.apache.org>.
Github user dick-twocows commented on the issue:

    https://github.com/apache/jena/pull/233
  
    I've added the core Thrift classes and three test classes which should give a better understanding of what the code is trying to do...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] jena pull request #233: Added mosaic and thrift packages to org.apache.jena....

Posted by ajs6f <gi...@git.apache.org>.
Github user ajs6f commented on a diff in the pull request:

    https://github.com/apache/jena/pull/233#discussion_r110182274
  
    --- Diff: jena-arq/src/main/java/org/apache/jena/sparql/core/mosaic/TransactionalDistributed.java ---
    @@ -0,0 +1,267 @@
    +package org.apache.jena.sparql.core.mosaic;
    +
    +import java.util.LinkedList;
    +import java.util.List;
    +import java.util.Objects;
    +import java.util.Set;
    +import java.util.concurrent.Callable;
    +import java.util.concurrent.ConcurrentHashMap;
    +import java.util.concurrent.ConcurrentMap;
    +import java.util.concurrent.ForkJoinPool;
    +import java.util.concurrent.ForkJoinWorkerThread;
    +import java.util.concurrent.Future;
    +import java.util.concurrent.atomic.AtomicInteger;
    +import java.util.function.Supplier;
    +import java.util.stream.Collectors;
    +
    +import org.apache.jena.query.ReadWrite;
    +import org.apache.jena.shared.JenaException;
    +import org.apache.jena.sparql.core.DatasetGraph;
    +import org.apache.jena.sparql.core.Transactional;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +/**
    + * Class to support a distributed Transactional.
    + * 
    + * Thread affinity is maintained by associating a Thread per DatasetGraph. 
    + * 
    + * A TransactionalDistributed should be declared via a ThreadLocal and thus provides a MRMW.
    + * 
    + * @author dick
    + *
    + */
    +public class TransactionalDistributed implements Transactional {
    +
    +	private static final Logger LOGGER = LoggerFactory.getLogger(TransactionalDistributed.class);
    +	
    +	protected final ForkJoinPool forkJoinPool = new ForkJoinPool(Runtime.getRuntime().availableProcessors());
    +	
    +	protected final String id = Thread.currentThread().toString();
    +
    +	protected final AtomicInteger readCount = new AtomicInteger();
    +	
    +	protected final AtomicInteger writeCount = new AtomicInteger();
    +	
    +	protected volatile ReadWrite readWrite = null;
    +	
    +	protected final Set<DatasetGraph> children = ConcurrentHashMap.newKeySet(32);
    +	
    +	protected final ConcurrentMap<DatasetGraph, ThreadProxy> workers = new ConcurrentHashMap<>(32);
    +
    +	protected class FJWT extends ForkJoinWorkerThread {
    +
    +		public FJWT(final ForkJoinPool pool) {
    +			super(pool);
    +		}
    +		
    +	}
    +	
    +	public TransactionalDistributed() {
    +		super();
    +	}
    +	
    +	/**
    +	 * Execute the given Runnable.
    +	 */
    +	public void execute(final Runnable task) {
    +		try {
    +			forkJoinPool.submit(task).get();
    +		} catch (final Exception exception) {
    +			throw new JenaException(exception);
    +		}
    +	}
    +	
    +	/**
    +	 * Submit the given Callable.
    +	 */
    +	public <T> Future<T> submit(final Callable<T> task) {
    +		try {
    +			return forkJoinPool.submit(task);
    +		} catch (final Exception exception) {
    +			throw new JenaException(exception);
    +		}
    +	}
    +	
    +	/*
    +	 * Distributed transactional methods. The general goal is to maintain transactions on the child DatasetGraph's.  
    +	 */
    +
    +	public ReadWrite getType() {
    +		return readWrite;
    +	}
    +	
    +	public boolean isIn(final ReadWrite compare) {
    +		return Objects.equals(readWrite, Objects.requireNonNull(compare));
    +	}
    +	
    +	protected Set<DatasetGraph> getChildren() {
    +		return children;
    +	}
    +
    +	protected ConcurrentMap<DatasetGraph, ThreadProxy> getWorkers() {
    +		return workers;
    +	}
    +
    +	/**
    +	 * Get the ThreadProxy for the given DatasetGraph.
    +	 */
    +	protected ThreadProxy getWorker(final DatasetGraph datasetGraph) {
    +		return getWorkers().computeIfAbsent(datasetGraph, (dg) -> {return new ThreadProxy();});
    +	}
    +	
    +	/**
    +	 * Submit the given Runnable to the correct ThreadProxy based on the given datasetGraph.
    +	 */
    +	public void execute(final DatasetGraph datasetGraph, final Runnable runnable) {
    +		getWorker(datasetGraph).execute(() -> {
    +			begin(datasetGraph);
    +			runnable.run();
    +		});
    +	}
    +	
    +	/**
    +	 * Submit the given Supplier to the correct ThreadProxy based on the given datasetGraph.
    +	 */
    +	public <T> Future<T> submit(final DatasetGraph datasetGraph, final Supplier<T> supplier) {
    +		return getWorker(datasetGraph).submit(() -> {
    +			begin(datasetGraph);
    +			return supplier.get();
    +		});
    +	}
    +	
    +	/**
    +	 * Begin a transaction on the given DatasetGraph.
    +	 */
    +	protected DatasetGraph begin(final DatasetGraph datasetGraph) {
    +		if (!isInTransaction()) {
    +			throw new JenaException("No parent transaction");
    +		}
    +		try {
    +			if (!children.contains(datasetGraph)) {
    +				if (datasetGraph.supportsTransactions()) {
    +					datasetGraph.begin(readWrite);
    +				}
    +				children.add(datasetGraph);
    +			}
    +			return datasetGraph;
    +		} catch (final Exception exception) {
    +			throw new JenaException(exception);
    +		}
    +	}
    +	
    +	/*
    +	 * Transactional
    +	 */
    +	
    +	@Override
    +	public void begin(final ReadWrite readWrite) {
    +		if (this.readWrite != null) {
    +			throw new JenaException("Already in a transaction " + this.readWrite);
    +		}
    +		this.readWrite = readWrite;
    +	}
    +	
    +	@Override
    +	public void commit() {
    +		List<Exception> threw = new LinkedList<>();
    +		children
    +			.forEach(datasetGraph -> {
    +				try {
    +					getWorker(datasetGraph).execute(() -> {
    +						if (datasetGraph.supportsTransactions()) {
    +							datasetGraph.commit();
    +						}
    +					});
    +				} catch (final Exception exception) {
    +					threw.add(exception);
    +				}
    +			});
    +		this.children.clear();
    +		this.readWrite = null;
    +		if (!threw.isEmpty()) {
    +			final JenaException jenaException = new JenaException();
    +			threw.forEach((exception) -> {jenaException.addSuppressed(exception);});
    +			throw jenaException;
    +		}
    +	}
    +	
    +	@Override
    +	public void abort() {
    +		List<Exception> threw = new LinkedList<>();
    +		children
    +			.forEach(datasetGraph -> {
    +				try {
    +					getWorker(datasetGraph).execute(() -> {
    +						if (datasetGraph.supportsTransactions() && datasetGraph.supportsTransactionAbort()) {
    +							datasetGraph.abort();
    +						} else {
    +							LOGGER.warn("Attempt to call Transactional.abort().", datasetGraph.getClass());
    +						}
    +					});
    +				} catch (final Exception exception) {
    +					threw.add(exception);
    +				}
    +			});
    +		this.children.clear();
    +		this.readWrite = null;
    +		if (!threw.isEmpty()) {
    +			final JenaException jenaException = new JenaException();
    +			threw.forEach((exception) -> {jenaException.addSuppressed(exception);});
    +			throw jenaException;
    +		}
    +	}
    +	
    +	@Override
    +	public void end() {
    +		if (readWrite != null && readWrite.equals(ReadWrite.WRITE)) {
    +			LOGGER.warn("End without commit/abort");
    +		}
    +		List<Exception> threw = new LinkedList<>();
    +		children
    +			.forEach(datasetGraph -> {
    +				try {
    +					getWorker(datasetGraph).execute(() -> {
    +						if (datasetGraph.supportsTransactions()) {
    +							datasetGraph.end();
    +						}
    +					});
    +				} catch (final Exception exception) {
    +					threw.add(exception);
    +				}
    +			});
    +		this.children.clear();
    +		this.workers.values().forEach(threadDelegate -> {
    +			threadDelegate.close();
    +		});
    +		this.workers.clear();
    +		this.readWrite = null;
    +		if (!threw.isEmpty()) {
    +			final JenaException jenaException = new JenaException();
    +			threw.forEach((exception) -> {jenaException.addSuppressed(exception);});
    --- End diff --
    
    Similar to above, could be `threw.forEach(jenaException::addSupressed)`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] jena pull request #233: Added mosaic and thrift packages to org.apache.jena....

Posted by ajs6f <gi...@git.apache.org>.
Github user ajs6f commented on a diff in the pull request:

    https://github.com/apache/jena/pull/233#discussion_r110245723
  
    --- Diff: jena-arq/src/main/java/org/apache/jena/sparql/core/mosaic/ThreadProxy.java ---
    @@ -0,0 +1,100 @@
    +package org.apache.jena.sparql.core.mosaic;
    +
    +import java.util.concurrent.Callable;
    +import java.util.concurrent.ExecutorService;
    +import java.util.concurrent.Future;
    +import java.util.concurrent.LinkedBlockingQueue;
    +import java.util.concurrent.ThreadFactory;
    +import java.util.concurrent.ThreadPoolExecutor;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicInteger;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +/**
    + * A Thread proxy which performs actions using one thread (the proxy).
    + * Internally an ExecutorService with one Thread is used with the action methods execute/submit(Runnable) and submit(Callable<T>).
    + * Calls to the action methods are FIFO actioned through the LinkedBlockingQueue of the ExecutorService.
    + * Useful where Thread affinity is required in a parallel processing environment, e.g. .parallelStream().flatMap(...)
    + * 
    + * @author dick
    + *
    + */
    +public class ThreadProxy {
    --- End diff --
    
    That's fine, I'm not talking about the name-- I'm suggesting that `ThreadProxy` might be a natural impl of `ExectutorService`, which would keep it within a publicly-defined contract. Not a big deal, just a thought.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] jena issue #233: Added mosaic and thrift packages to org.apache.jena.sparql....

Posted by dick-twocows <gi...@git.apache.org>.
Github user dick-twocows commented on the issue:

    https://github.com/apache/jena/pull/233
  
    There's some more to commit and I want the tests in as well...
    
    
    Dick
    -------- Original message --------From: "A. Soroka" <no...@github.com> Date: 13/05/2017  13:06  (GMT+00:00) To: apache/jena <je...@noreply.github.com> Cc: Dick Murray <di...@twocows.org>, Mention <me...@noreply.github.com> Subject: Re: [apache/jena] Added mosaic and thrift packages to org.apache.jena.sparql.core. (#233) 
    Hey, @dick-twocows -- just checking in on this. Are you still adding more commits or is it time to move on to in-dept review? Thanks again for this contribution!
    
    —
    You are receiving this because you were mentioned.
    Reply to this email directly, view it on GitHub, or mute the thread.
    
    
      
      
    
    
    
    
    {"api_version":"1.0","publisher":{"api_key":"05dde50f1d1a384dd78767c55493e4bb","name":"GitHub"},"entity":{"external_key":"github/apache/jena","title":"apache/jena","subtitle":"GitHub repository","main_image_url":"https://cloud.githubusercontent.com/assets/143418/17495839/a5054eac-5d88-11e6-95fc-7290892c7bb5.png","avatar_image_url":"https://cloud.githubusercontent.com/assets/143418/15842166/7c72db34-2c0b-11e6-9aed-b52498112777.png","action":{"name":"Open in GitHub","url":"https://github.com/apache/jena"}},"updates":{"snippets":[{"icon":"PERSON","message":"@ajs6f in #233: Hey, @dick-twocows -- just checking in on this. Are you still adding more commits or is it time to move on to in-dept review? Thanks again for this contribution!"}],"action":{"name":"View Pull Request","url":"https://github.com/apache/jena/pull/233#issuecomment-301244095"}}}


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] jena pull request #233: Added mosaic and thrift packages to org.apache.jena....

Posted by ajs6f <gi...@git.apache.org>.
Github user ajs6f commented on a diff in the pull request:

    https://github.com/apache/jena/pull/233#discussion_r110179015
  
    --- Diff: jena-arq/src/main/java/org/apache/jena/sparql/core/mosaic/LockTry.java ---
    @@ -0,0 +1,8 @@
    +package org.apache.jena.sparql.core.mosaic;
    +
    +import org.apache.jena.shared.Lock;
    +
    +public interface LockTry extends Lock {
    +
    +	boolean tryEnterCriticalSection(boolean readLockRequested);
    --- End diff --
    
    It might be nice to put a Javadoc comment on this to make the required semantics precisely known. They seem clear now, but as Jena's concurrency evolves, they might get less so.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] jena issue #233: Added mosaic and thrift packages to org.apache.jena.sparql....

Posted by ajs6f <gi...@git.apache.org>.
Github user ajs6f commented on the issue:

    https://github.com/apache/jena/pull/233
  
    Hey, @dick-twocows -- just checking in on this. Are you still adding more commits or is it time to move on to in-dept review? Thanks again for this contribution!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] jena pull request #233: Added mosaic and thrift packages to org.apache.jena....

Posted by ajs6f <gi...@git.apache.org>.
Github user ajs6f commented on a diff in the pull request:

    https://github.com/apache/jena/pull/233#discussion_r110252024
  
    --- Diff: jena-arq/src/main/java/org/apache/jena/sparql/core/mosaic/DatasetGraphMosaic.java ---
    @@ -0,0 +1,589 @@
    +package org.apache.jena.sparql.core.mosaic;
    +
    +import static org.slf4j.LoggerFactory.getLogger;
    +
    +import java.util.Iterator;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.UUID;
    +import java.util.WeakHashMap;
    +import java.util.concurrent.ConcurrentHashMap;
    +import java.util.concurrent.atomic.AtomicInteger;
    +import java.util.function.Consumer;
    +import java.util.function.Function;
    +import java.util.function.Predicate;
    +import java.util.stream.Collectors;
    +import java.util.stream.Stream;
    +import java.util.stream.StreamSupport;
    +
    +import org.apache.jena.graph.Graph;
    +import org.apache.jena.graph.Node;
    +import org.apache.jena.query.ReadWrite;
    +import org.apache.jena.shared.JenaException;
    +import org.apache.jena.shared.Lock;
    +import org.apache.jena.sparql.core.DatasetGraph;
    +import org.apache.jena.sparql.core.GraphView;
    +import org.apache.jena.sparql.core.Quad;
    +import org.apache.jena.sparql.core.thrift.IteratorCachedArray;
    +import org.apache.jena.sparql.util.Context;
    +import org.apache.jena.sparql.util.Symbol;
    +import org.slf4j.Logger;
    +
    +/**
    + * A DatasetGraph which distributes actions across a number of child DatasetGraph's.
    + * 
    + * As Jena requires Thread affinity when working with transactions this class uses ThreadProxy.
    + * 
    + * Most DatasetGraph methods call into a set of convenience methods which perform common tasks, i.e. mosaicIterator(Function<DatasetGraph, Iterator<T>>).
    + * 
    + * @author dick
    + *
    + */
    +public class DatasetGraphMosaic implements DatasetGraph {
    +	
    +	private static final Logger LOGGER = getLogger(DatasetGraphMosaic.class);
    +
    +	public static final String jenaID = UUID.randomUUID().toString();
    +	
    +	public static final Symbol MOSAIC_STREAM_SEQUENTIAL = Symbol.create(DatasetGraphMosaic.class.getSimpleName() + ".mosaicStreamSequential");
    +
    +	public static final Symbol WRAP_ITERATOR = Symbol.create(DatasetGraphMosaic.class.getSimpleName() + ".wrapIterator");
    +
    +	protected final String id = UUID.randomUUID().toString();
    +	
    +	protected volatile Boolean closed = false;
    +	
    +	protected final Topology topology = new Topology();
    +	
    +	protected final Set<Tessera> mosaic;
    +	
    +	protected final ThreadLocal<TransactionalDistributed> transactional;
    +
    +	protected final WeakHashMap<Thread, TransactionalDistributed> monitor;
    +	
    +	protected final Lock lock;
    +	
    +	protected final DatasetGraphShimWrite shimWrite;
    +	
    +	protected final AtomicInteger readCount = new AtomicInteger();
    +
    +	protected final AtomicInteger writeCount = new AtomicInteger();
    +
    +	protected final AtomicInteger transactionCount = new AtomicInteger();
    +	
    +	protected final Context context;
    +	
    +	public DatasetGraphMosaic() {
    +		super();
    +		
    +		mosaic = ConcurrentHashMap.newKeySet(256);
    +		
    +		transactional = new ThreadLocal<>();
    +		
    +		monitor = new WeakHashMap<>(32);
    +		
    +		lock = new LockMRAndMW();
    +		
    +		shimWrite = new DatasetGraphShimWrite() {
    +			
    +			@Override
    +			public void add(final Node g, final Node s, final Node p, final Node o) {
    --- End diff --
    
    Just noticed this (and the one below). I'm probably missing something painfully obvious here (sorry!) but with NOOP `add` methods, how does anything ever get written? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] jena pull request #233: Added mosaic and thrift packages to org.apache.jena....

Posted by dick-twocows <gi...@git.apache.org>.
Github user dick-twocows commented on a diff in the pull request:

    https://github.com/apache/jena/pull/233#discussion_r110256011
  
    --- Diff: jena-arq/src/main/java/org/apache/jena/sparql/core/mosaic/DatasetGraphMosaic.java ---
    @@ -0,0 +1,589 @@
    +package org.apache.jena.sparql.core.mosaic;
    +
    +import static org.slf4j.LoggerFactory.getLogger;
    +
    +import java.util.Iterator;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.UUID;
    +import java.util.WeakHashMap;
    +import java.util.concurrent.ConcurrentHashMap;
    +import java.util.concurrent.atomic.AtomicInteger;
    +import java.util.function.Consumer;
    +import java.util.function.Function;
    +import java.util.function.Predicate;
    +import java.util.stream.Collectors;
    +import java.util.stream.Stream;
    +import java.util.stream.StreamSupport;
    +
    +import org.apache.jena.graph.Graph;
    +import org.apache.jena.graph.Node;
    +import org.apache.jena.query.ReadWrite;
    +import org.apache.jena.shared.JenaException;
    +import org.apache.jena.shared.Lock;
    +import org.apache.jena.sparql.core.DatasetGraph;
    +import org.apache.jena.sparql.core.GraphView;
    +import org.apache.jena.sparql.core.Quad;
    +import org.apache.jena.sparql.core.thrift.IteratorCachedArray;
    +import org.apache.jena.sparql.util.Context;
    +import org.apache.jena.sparql.util.Symbol;
    +import org.slf4j.Logger;
    +
    +/**
    + * A DatasetGraph which distributes actions across a number of child DatasetGraph's.
    + * 
    + * As Jena requires Thread affinity when working with transactions this class uses ThreadProxy.
    + * 
    + * Most DatasetGraph methods call into a set of convenience methods which perform common tasks, i.e. mosaicIterator(Function<DatasetGraph, Iterator<T>>).
    + * 
    + * @author dick
    + *
    + */
    +public class DatasetGraphMosaic implements DatasetGraph {
    +	
    +	private static final Logger LOGGER = getLogger(DatasetGraphMosaic.class);
    +
    +	public static final String jenaID = UUID.randomUUID().toString();
    +	
    +	public static final Symbol MOSAIC_STREAM_SEQUENTIAL = Symbol.create(DatasetGraphMosaic.class.getSimpleName() + ".mosaicStreamSequential");
    +
    +	public static final Symbol WRAP_ITERATOR = Symbol.create(DatasetGraphMosaic.class.getSimpleName() + ".wrapIterator");
    +
    +	protected final String id = UUID.randomUUID().toString();
    +	
    +	protected volatile Boolean closed = false;
    +	
    +	protected final Topology topology = new Topology();
    +	
    +	protected final Set<Tessera> mosaic;
    +	
    +	protected final ThreadLocal<TransactionalDistributed> transactional;
    +
    +	protected final WeakHashMap<Thread, TransactionalDistributed> monitor;
    +	
    +	protected final Lock lock;
    +	
    +	protected final DatasetGraphShimWrite shimWrite;
    +	
    +	protected final AtomicInteger readCount = new AtomicInteger();
    +
    +	protected final AtomicInteger writeCount = new AtomicInteger();
    +
    +	protected final AtomicInteger transactionCount = new AtomicInteger();
    +	
    +	protected final Context context;
    +	
    +	public DatasetGraphMosaic() {
    +		super();
    +		
    +		mosaic = ConcurrentHashMap.newKeySet(256);
    +		
    +		transactional = new ThreadLocal<>();
    +		
    +		monitor = new WeakHashMap<>(32);
    +		
    +		lock = new LockMRAndMW();
    +		
    +		shimWrite = new DatasetGraphShimWrite() {
    +			
    +			@Override
    +			public void add(final Node g, final Node s, final Node p, final Node o) {
    --- End diff --
    
    Replication, write the quad to at least n children..?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] jena pull request #233: Added mosaic and thrift packages to org.apache.jena....

Posted by dick-twocows <gi...@git.apache.org>.
Github user dick-twocows commented on a diff in the pull request:

    https://github.com/apache/jena/pull/233#discussion_r110245113
  
    --- Diff: jena-arq/src/main/java/org/apache/jena/sparql/core/mosaic/ThreadProxy.java ---
    @@ -0,0 +1,100 @@
    +package org.apache.jena.sparql.core.mosaic;
    +
    +import java.util.concurrent.Callable;
    +import java.util.concurrent.ExecutorService;
    +import java.util.concurrent.Future;
    +import java.util.concurrent.LinkedBlockingQueue;
    +import java.util.concurrent.ThreadFactory;
    +import java.util.concurrent.ThreadPoolExecutor;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicInteger;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +/**
    + * A Thread proxy which performs actions using one thread (the proxy).
    + * Internally an ExecutorService with one Thread is used with the action methods execute/submit(Runnable) and submit(Callable<T>).
    + * Calls to the action methods are FIFO actioned through the LinkedBlockingQueue of the ExecutorService.
    + * Useful where Thread affinity is required in a parallel processing environment, e.g. .parallelStream().flatMap(...)
    + * 
    + * @author dick
    + *
    + */
    +public class ThreadProxy {
    --- End diff --
    
    I wanted it named for what it does which is to FIFO through a proxy thread which leverages all the stuff in a executor service to provide Jena thread affinity.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] jena issue #233: Added mosaic and thrift packages to org.apache.jena.sparql....

Posted by ajs6f <gi...@git.apache.org>.
Github user ajs6f commented on the issue:

    https://github.com/apache/jena/pull/233
  
    Okay, cool! Just give a yell when you want more eyes on it. Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] jena pull request #233: Added mosaic and thrift packages to org.apache.jena....

Posted by ajs6f <gi...@git.apache.org>.
Github user ajs6f commented on a diff in the pull request:

    https://github.com/apache/jena/pull/233#discussion_r110256243
  
    --- Diff: jena-arq/src/main/java/org/apache/jena/sparql/core/mosaic/DatasetGraphMosaic.java ---
    @@ -0,0 +1,589 @@
    +package org.apache.jena.sparql.core.mosaic;
    +
    +import static org.slf4j.LoggerFactory.getLogger;
    +
    +import java.util.Iterator;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.UUID;
    +import java.util.WeakHashMap;
    +import java.util.concurrent.ConcurrentHashMap;
    +import java.util.concurrent.atomic.AtomicInteger;
    +import java.util.function.Consumer;
    +import java.util.function.Function;
    +import java.util.function.Predicate;
    +import java.util.stream.Collectors;
    +import java.util.stream.Stream;
    +import java.util.stream.StreamSupport;
    +
    +import org.apache.jena.graph.Graph;
    +import org.apache.jena.graph.Node;
    +import org.apache.jena.query.ReadWrite;
    +import org.apache.jena.shared.JenaException;
    +import org.apache.jena.shared.Lock;
    +import org.apache.jena.sparql.core.DatasetGraph;
    +import org.apache.jena.sparql.core.GraphView;
    +import org.apache.jena.sparql.core.Quad;
    +import org.apache.jena.sparql.core.thrift.IteratorCachedArray;
    +import org.apache.jena.sparql.util.Context;
    +import org.apache.jena.sparql.util.Symbol;
    +import org.slf4j.Logger;
    +
    +/**
    + * A DatasetGraph which distributes actions across a number of child DatasetGraph's.
    + * 
    + * As Jena requires Thread affinity when working with transactions this class uses ThreadProxy.
    + * 
    + * Most DatasetGraph methods call into a set of convenience methods which perform common tasks, i.e. mosaicIterator(Function<DatasetGraph, Iterator<T>>).
    + * 
    + * @author dick
    + *
    + */
    +public class DatasetGraphMosaic implements DatasetGraph {
    +	
    +	private static final Logger LOGGER = getLogger(DatasetGraphMosaic.class);
    +
    +	public static final String jenaID = UUID.randomUUID().toString();
    +	
    +	public static final Symbol MOSAIC_STREAM_SEQUENTIAL = Symbol.create(DatasetGraphMosaic.class.getSimpleName() + ".mosaicStreamSequential");
    +
    +	public static final Symbol WRAP_ITERATOR = Symbol.create(DatasetGraphMosaic.class.getSimpleName() + ".wrapIterator");
    +
    +	protected final String id = UUID.randomUUID().toString();
    +	
    +	protected volatile Boolean closed = false;
    +	
    +	protected final Topology topology = new Topology();
    +	
    +	protected final Set<Tessera> mosaic;
    +	
    +	protected final ThreadLocal<TransactionalDistributed> transactional;
    +
    +	protected final WeakHashMap<Thread, TransactionalDistributed> monitor;
    +	
    +	protected final Lock lock;
    +	
    +	protected final DatasetGraphShimWrite shimWrite;
    +	
    +	protected final AtomicInteger readCount = new AtomicInteger();
    +
    +	protected final AtomicInteger writeCount = new AtomicInteger();
    +
    +	protected final AtomicInteger transactionCount = new AtomicInteger();
    +	
    +	protected final Context context;
    +	
    +	public DatasetGraphMosaic() {
    +		super();
    +		
    +		mosaic = ConcurrentHashMap.newKeySet(256);
    +		
    +		transactional = new ThreadLocal<>();
    +		
    +		monitor = new WeakHashMap<>(32);
    +		
    +		lock = new LockMRAndMW();
    +		
    +		shimWrite = new DatasetGraphShimWrite() {
    +			
    +			@Override
    +			public void add(final Node g, final Node s, final Node p, final Node o) {
    --- End diff --
    
    Exactly. For hot failover and so forth.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] jena pull request #233: Added mosaic and thrift packages to org.apache.jena....

Posted by ajs6f <gi...@git.apache.org>.
Github user ajs6f commented on a diff in the pull request:

    https://github.com/apache/jena/pull/233#discussion_r110180946
  
    --- Diff: jena-arq/src/main/java/org/apache/jena/sparql/core/mosaic/ThreadProxy.java ---
    @@ -0,0 +1,100 @@
    +package org.apache.jena.sparql.core.mosaic;
    +
    +import java.util.concurrent.Callable;
    +import java.util.concurrent.ExecutorService;
    +import java.util.concurrent.Future;
    +import java.util.concurrent.LinkedBlockingQueue;
    +import java.util.concurrent.ThreadFactory;
    +import java.util.concurrent.ThreadPoolExecutor;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicInteger;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +/**
    + * A Thread proxy which performs actions using one thread (the proxy).
    + * Internally an ExecutorService with one Thread is used with the action methods execute/submit(Runnable) and submit(Callable<T>).
    + * Calls to the action methods are FIFO actioned through the LinkedBlockingQueue of the ExecutorService.
    + * Useful where Thread affinity is required in a parallel processing environment, e.g. .parallelStream().flatMap(...)
    + * 
    + * @author dick
    + *
    + */
    +public class ThreadProxy {
    --- End diff --
    
    This whole class seems a bit like a special-purpose `ExecutorService`. Perhaps it might be better to use that type rather than introducing a new, independent one...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] jena issue #233: Added mosaic and thrift packages to org.apache.jena.sparql....

Posted by ajs6f <gi...@git.apache.org>.
Github user ajs6f commented on the issue:

    https://github.com/apache/jena/pull/233
  
    Hey, @dick-twocows and @afs, just picking up this conversation. Thanks for the work so far, @dick-twocows! Do you feel like this is in a state ready for in-depth review, or are you still working with it? @afs, does @dick-twocows's comment above gives a good sense of the contribution, or were you looking for something more in-depth? I think it makes a good outline and there's not much point to filling in a lot of detail until we are sure the contribution is close to finished.
    
    I think it would be great to get this into the next release and I would be happy to a) work with @dick-twocows to help make that happen and b) cut that release. (Although as I never tire of complaining, it would also be great for another committer to do that :) ).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] jena pull request #233: Added mosaic and thrift packages to org.apache.jena....

Posted by dick-twocows <gi...@git.apache.org>.
Github user dick-twocows commented on a diff in the pull request:

    https://github.com/apache/jena/pull/233#discussion_r110255601
  
    --- Diff: jena-arq/src/main/java/org/apache/jena/sparql/core/mosaic/DatasetGraphMosaic.java ---
    @@ -0,0 +1,589 @@
    +package org.apache.jena.sparql.core.mosaic;
    +
    +import static org.slf4j.LoggerFactory.getLogger;
    +
    +import java.util.Iterator;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.UUID;
    +import java.util.WeakHashMap;
    +import java.util.concurrent.ConcurrentHashMap;
    +import java.util.concurrent.atomic.AtomicInteger;
    +import java.util.function.Consumer;
    +import java.util.function.Function;
    +import java.util.function.Predicate;
    +import java.util.stream.Collectors;
    +import java.util.stream.Stream;
    +import java.util.stream.StreamSupport;
    +
    +import org.apache.jena.graph.Graph;
    +import org.apache.jena.graph.Node;
    +import org.apache.jena.query.ReadWrite;
    +import org.apache.jena.shared.JenaException;
    +import org.apache.jena.shared.Lock;
    +import org.apache.jena.sparql.core.DatasetGraph;
    +import org.apache.jena.sparql.core.GraphView;
    +import org.apache.jena.sparql.core.Quad;
    +import org.apache.jena.sparql.core.thrift.IteratorCachedArray;
    +import org.apache.jena.sparql.util.Context;
    +import org.apache.jena.sparql.util.Symbol;
    +import org.slf4j.Logger;
    +
    +/**
    + * A DatasetGraph which distributes actions across a number of child DatasetGraph's.
    + * 
    + * As Jena requires Thread affinity when working with transactions this class uses ThreadProxy.
    + * 
    + * Most DatasetGraph methods call into a set of convenience methods which perform common tasks, i.e. mosaicIterator(Function<DatasetGraph, Iterator<T>>).
    + * 
    + * @author dick
    + *
    + */
    +public class DatasetGraphMosaic implements DatasetGraph {
    +	
    +	private static final Logger LOGGER = getLogger(DatasetGraphMosaic.class);
    +
    +	public static final String jenaID = UUID.randomUUID().toString();
    +	
    +	public static final Symbol MOSAIC_STREAM_SEQUENTIAL = Symbol.create(DatasetGraphMosaic.class.getSimpleName() + ".mosaicStreamSequential");
    +
    +	public static final Symbol WRAP_ITERATOR = Symbol.create(DatasetGraphMosaic.class.getSimpleName() + ".wrapIterator");
    +
    +	protected final String id = UUID.randomUUID().toString();
    +	
    +	protected volatile Boolean closed = false;
    +	
    +	protected final Topology topology = new Topology();
    +	
    +	protected final Set<Tessera> mosaic;
    +	
    +	protected final ThreadLocal<TransactionalDistributed> transactional;
    +
    +	protected final WeakHashMap<Thread, TransactionalDistributed> monitor;
    +	
    +	protected final Lock lock;
    +	
    +	protected final DatasetGraphShimWrite shimWrite;
    +	
    +	protected final AtomicInteger readCount = new AtomicInteger();
    +
    +	protected final AtomicInteger writeCount = new AtomicInteger();
    +
    +	protected final AtomicInteger transactionCount = new AtomicInteger();
    +	
    +	protected final Context context;
    +	
    +	public DatasetGraphMosaic() {
    +		super();
    +		
    +		mosaic = ConcurrentHashMap.newKeySet(256);
    +		
    +		transactional = new ThreadLocal<>();
    +		
    +		monitor = new WeakHashMap<>(32);
    +		
    +		lock = new LockMRAndMW();
    +		
    +		shimWrite = new DatasetGraphShimWrite() {
    +			
    +			@Override
    +			public void add(final Node g, final Node s, final Node p, final Node o) {
    --- End diff --
    
    We already have a shim which checks the quad graph and adds the quad to each child which contains that graph.
    
    Without complicating this the modify of the Thrift has its own issues because the single nature of the add function hits the network so writes are cached and a flush triggered by a read, or the read from the remote is aggregated with the local cache to maintain read consistency depending on what you want, latency Vs safety.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] jena pull request #233: Added mosaic and thrift packages to org.apache.jena....

Posted by ajs6f <gi...@git.apache.org>.
Github user ajs6f commented on a diff in the pull request:

    https://github.com/apache/jena/pull/233#discussion_r110180316
  
    --- Diff: jena-arq/src/main/java/org/apache/jena/sparql/core/mosaic/ThreadProxy.java ---
    @@ -0,0 +1,100 @@
    +package org.apache.jena.sparql.core.mosaic;
    +
    +import java.util.concurrent.Callable;
    +import java.util.concurrent.ExecutorService;
    +import java.util.concurrent.Future;
    +import java.util.concurrent.LinkedBlockingQueue;
    +import java.util.concurrent.ThreadFactory;
    +import java.util.concurrent.ThreadPoolExecutor;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicInteger;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +/**
    + * A Thread proxy which performs actions using one thread (the proxy).
    + * Internally an ExecutorService with one Thread is used with the action methods execute/submit(Runnable) and submit(Callable<T>).
    + * Calls to the action methods are FIFO actioned through the LinkedBlockingQueue of the ExecutorService.
    + * Useful where Thread affinity is required in a parallel processing environment, e.g. .parallelStream().flatMap(...)
    + * 
    + * @author dick
    + *
    + */
    +public class ThreadProxy {
    +
    +	private static final Logger LOGGER = LoggerFactory.getLogger(ThreadProxy.class);
    +	
    +	protected static final IDFactory ID_FACTORY = IDFactory.valueOf(ThreadProxy.class);
    +	
    +	protected static final ThreadGroup THREAD_GROUP = new ThreadGroup(ThreadProxy.class.getName());
    +	
    +	protected final String id;
    +	
    +	protected final String createdBy;
    +	
    +	protected final ExecutorService executorService;
    +	
    +	protected final AtomicInteger executeCount;
    +	
    +	protected final AtomicInteger submitCount;
    +	
    +	protected final AtomicInteger exceptionCount;
    +	
    +	public ThreadProxy() {
    +		super();
    +		id = ID_FACTORY.next();
    +		createdBy =  Thread.currentThread().getName();
    +		executorService = new ThreadPoolExecutor(1, 1,
    --- End diff --
    
    Are you doing this (instead of, say, `java.util.concurrent.Executors.newSingleThreadExecutor()`) especially to control the thread group and id?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] jena pull request #233: Added mosaic and thrift packages to org.apache.jena....

Posted by ajs6f <gi...@git.apache.org>.
Github user ajs6f commented on a diff in the pull request:

    https://github.com/apache/jena/pull/233#discussion_r110183160
  
    --- Diff: jena-arq/src/main/java/org/apache/jena/sparql/core/thrift/IteratorCached.java ---
    @@ -0,0 +1,21 @@
    +package org.apache.jena.sparql.core.thrift;
    +
    +import java.util.Iterator;
    +
    +public abstract class IteratorCached<E> implements Iterator<E> {
    --- End diff --
    
    Since this class isn't actually caching the contents of the wrapped `Iterator`, perhaps a Javadoc comment explaining its semantics? Presumably subclasses are supposed to do caching of some kind (as below)?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] jena issue #233: Added mosaic and thrift packages to org.apache.jena.sparql....

Posted by afs <gi...@git.apache.org>.
Github user afs commented on the issue:

    https://github.com/apache/jena/pull/233
  
    Looks interesting!
    
    My starting point is to understand what this is, trying to divide it between core functionality and the mosaic parts.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] jena issue #233: Added mosaic and thrift packages to org.apache.jena.sparql....

Posted by ajs6f <gi...@git.apache.org>.
Github user ajs6f commented on the issue:

    https://github.com/apache/jena/pull/233
  
    That seems reasonable to start with (blocking). That's what people would get if they were addressing the mosaic pieces directly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] jena issue #233: Added mosaic and thrift packages to org.apache.jena.sparql....

Posted by dick-twocows <gi...@git.apache.org>.
Github user dick-twocows commented on the issue:

    https://github.com/apache/jena/pull/233
  
    I need to commit the Thrift dataset graph classes i.e. the iface and server. Plus the test code as the project was using a separate test mechanism...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] jena issue #233: Added mosaic and thrift packages to org.apache.jena.sparql....

Posted by ajs6f <gi...@git.apache.org>.
Github user ajs6f commented on the issue:

    https://github.com/apache/jena/pull/233
  
    @afs No, I haven't fooled with it at all because I didn't want to spend that time until @dick-twocows confirmed that it was ready for other eyes.
    Re: `StreamRDFTriHexTable` I didn't see that in `afs/jena:master` or in `afs/mantis:master`-- where is it?
    
    I'm certainly +1 to @afs's comments about it being better to have some new modules than more code in the core, although distributed operation is very important in the future, I think, and I could imagine this stuff migrating into the core at some point.
    
    @afs is asking for some clarity on how this stuff is laid out-- one way might be for @dick-twocows  to add package comments with a solid description in each of what that package does.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] jena pull request #233: Added mosaic and thrift packages to org.apache.jena....

Posted by ajs6f <gi...@git.apache.org>.
Github user ajs6f commented on a diff in the pull request:

    https://github.com/apache/jena/pull/233#discussion_r110254192
  
    --- Diff: jena-arq/src/main/java/org/apache/jena/sparql/core/mosaic/DatasetGraphMosaic.java ---
    @@ -0,0 +1,589 @@
    +package org.apache.jena.sparql.core.mosaic;
    +
    +import static org.slf4j.LoggerFactory.getLogger;
    +
    +import java.util.Iterator;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.UUID;
    +import java.util.WeakHashMap;
    +import java.util.concurrent.ConcurrentHashMap;
    +import java.util.concurrent.atomic.AtomicInteger;
    +import java.util.function.Consumer;
    +import java.util.function.Function;
    +import java.util.function.Predicate;
    +import java.util.stream.Collectors;
    +import java.util.stream.Stream;
    +import java.util.stream.StreamSupport;
    +
    +import org.apache.jena.graph.Graph;
    +import org.apache.jena.graph.Node;
    +import org.apache.jena.query.ReadWrite;
    +import org.apache.jena.shared.JenaException;
    +import org.apache.jena.shared.Lock;
    +import org.apache.jena.sparql.core.DatasetGraph;
    +import org.apache.jena.sparql.core.GraphView;
    +import org.apache.jena.sparql.core.Quad;
    +import org.apache.jena.sparql.core.thrift.IteratorCachedArray;
    +import org.apache.jena.sparql.util.Context;
    +import org.apache.jena.sparql.util.Symbol;
    +import org.slf4j.Logger;
    +
    +/**
    + * A DatasetGraph which distributes actions across a number of child DatasetGraph's.
    + * 
    + * As Jena requires Thread affinity when working with transactions this class uses ThreadProxy.
    + * 
    + * Most DatasetGraph methods call into a set of convenience methods which perform common tasks, i.e. mosaicIterator(Function<DatasetGraph, Iterator<T>>).
    + * 
    + * @author dick
    + *
    + */
    +public class DatasetGraphMosaic implements DatasetGraph {
    +	
    +	private static final Logger LOGGER = getLogger(DatasetGraphMosaic.class);
    +
    +	public static final String jenaID = UUID.randomUUID().toString();
    +	
    +	public static final Symbol MOSAIC_STREAM_SEQUENTIAL = Symbol.create(DatasetGraphMosaic.class.getSimpleName() + ".mosaicStreamSequential");
    +
    +	public static final Symbol WRAP_ITERATOR = Symbol.create(DatasetGraphMosaic.class.getSimpleName() + ".wrapIterator");
    +
    +	protected final String id = UUID.randomUUID().toString();
    +	
    +	protected volatile Boolean closed = false;
    +	
    +	protected final Topology topology = new Topology();
    +	
    +	protected final Set<Tessera> mosaic;
    +	
    +	protected final ThreadLocal<TransactionalDistributed> transactional;
    +
    +	protected final WeakHashMap<Thread, TransactionalDistributed> monitor;
    +	
    +	protected final Lock lock;
    +	
    +	protected final DatasetGraphShimWrite shimWrite;
    +	
    +	protected final AtomicInteger readCount = new AtomicInteger();
    +
    +	protected final AtomicInteger writeCount = new AtomicInteger();
    +
    +	protected final AtomicInteger transactionCount = new AtomicInteger();
    +	
    +	protected final Context context;
    +	
    +	public DatasetGraphMosaic() {
    +		super();
    +		
    +		mosaic = ConcurrentHashMap.newKeySet(256);
    +		
    +		transactional = new ThreadLocal<>();
    +		
    +		monitor = new WeakHashMap<>(32);
    +		
    +		lock = new LockMRAndMW();
    +		
    +		shimWrite = new DatasetGraphShimWrite() {
    +			
    +			@Override
    +			public void add(final Node g, final Node s, final Node p, final Node o) {
    --- End diff --
    
    Actually, I suppose you'd want to make replication possible as well as distribution. But maybe others will differ...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] jena pull request #233: Added mosaic and thrift packages to org.apache.jena....

Posted by dick-twocows <gi...@git.apache.org>.
Github user dick-twocows commented on a diff in the pull request:

    https://github.com/apache/jena/pull/233#discussion_r110253257
  
    --- Diff: jena-arq/src/main/java/org/apache/jena/sparql/core/mosaic/DatasetGraphMosaic.java ---
    @@ -0,0 +1,589 @@
    +package org.apache.jena.sparql.core.mosaic;
    +
    +import static org.slf4j.LoggerFactory.getLogger;
    +
    +import java.util.Iterator;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.UUID;
    +import java.util.WeakHashMap;
    +import java.util.concurrent.ConcurrentHashMap;
    +import java.util.concurrent.atomic.AtomicInteger;
    +import java.util.function.Consumer;
    +import java.util.function.Function;
    +import java.util.function.Predicate;
    +import java.util.stream.Collectors;
    +import java.util.stream.Stream;
    +import java.util.stream.StreamSupport;
    +
    +import org.apache.jena.graph.Graph;
    +import org.apache.jena.graph.Node;
    +import org.apache.jena.query.ReadWrite;
    +import org.apache.jena.shared.JenaException;
    +import org.apache.jena.shared.Lock;
    +import org.apache.jena.sparql.core.DatasetGraph;
    +import org.apache.jena.sparql.core.GraphView;
    +import org.apache.jena.sparql.core.Quad;
    +import org.apache.jena.sparql.core.thrift.IteratorCachedArray;
    +import org.apache.jena.sparql.util.Context;
    +import org.apache.jena.sparql.util.Symbol;
    +import org.slf4j.Logger;
    +
    +/**
    + * A DatasetGraph which distributes actions across a number of child DatasetGraph's.
    + * 
    + * As Jena requires Thread affinity when working with transactions this class uses ThreadProxy.
    + * 
    + * Most DatasetGraph methods call into a set of convenience methods which perform common tasks, i.e. mosaicIterator(Function<DatasetGraph, Iterator<T>>).
    + * 
    + * @author dick
    + *
    + */
    +public class DatasetGraphMosaic implements DatasetGraph {
    +	
    +	private static final Logger LOGGER = getLogger(DatasetGraphMosaic.class);
    +
    +	public static final String jenaID = UUID.randomUUID().toString();
    +	
    +	public static final Symbol MOSAIC_STREAM_SEQUENTIAL = Symbol.create(DatasetGraphMosaic.class.getSimpleName() + ".mosaicStreamSequential");
    +
    +	public static final Symbol WRAP_ITERATOR = Symbol.create(DatasetGraphMosaic.class.getSimpleName() + ".wrapIterator");
    +
    +	protected final String id = UUID.randomUUID().toString();
    +	
    +	protected volatile Boolean closed = false;
    +	
    +	protected final Topology topology = new Topology();
    +	
    +	protected final Set<Tessera> mosaic;
    +	
    +	protected final ThreadLocal<TransactionalDistributed> transactional;
    +
    +	protected final WeakHashMap<Thread, TransactionalDistributed> monitor;
    +	
    +	protected final Lock lock;
    +	
    +	protected final DatasetGraphShimWrite shimWrite;
    +	
    +	protected final AtomicInteger readCount = new AtomicInteger();
    +
    +	protected final AtomicInteger writeCount = new AtomicInteger();
    +
    +	protected final AtomicInteger transactionCount = new AtomicInteger();
    +	
    +	protected final Context context;
    +	
    +	public DatasetGraphMosaic() {
    +		super();
    +		
    +		mosaic = ConcurrentHashMap.newKeySet(256);
    +		
    +		transactional = new ThreadLocal<>();
    +		
    +		monitor = new WeakHashMap<>(32);
    +		
    +		lock = new LockMRAndMW();
    +		
    +		shimWrite = new DatasetGraphShimWrite() {
    +			
    +			@Override
    +			public void add(final Node g, final Node s, final Node p, final Node o) {
    --- End diff --
    
    You're spot on, it doesn't and this commit quietly drops! The shim is supposed to allow different modify implementations to be dropped in. What child is added to? That shim should be RO and throw UOE. This needs input from the Jena Devs as to what is most appropriate..?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] jena pull request #233: Added mosaic and thrift packages to org.apache.jena....

Posted by dick-twocows <gi...@git.apache.org>.
Github user dick-twocows commented on a diff in the pull request:

    https://github.com/apache/jena/pull/233#discussion_r110244425
  
    --- Diff: jena-arq/src/main/java/org/apache/jena/sparql/core/mosaic/ThreadProxy.java ---
    @@ -0,0 +1,100 @@
    +package org.apache.jena.sparql.core.mosaic;
    +
    +import java.util.concurrent.Callable;
    +import java.util.concurrent.ExecutorService;
    +import java.util.concurrent.Future;
    +import java.util.concurrent.LinkedBlockingQueue;
    +import java.util.concurrent.ThreadFactory;
    +import java.util.concurrent.ThreadPoolExecutor;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicInteger;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +/**
    + * A Thread proxy which performs actions using one thread (the proxy).
    + * Internally an ExecutorService with one Thread is used with the action methods execute/submit(Runnable) and submit(Callable<T>).
    + * Calls to the action methods are FIFO actioned through the LinkedBlockingQueue of the ExecutorService.
    + * Useful where Thread affinity is required in a parallel processing environment, e.g. .parallelStream().flatMap(...)
    + * 
    + * @author dick
    + *
    + */
    +public class ThreadProxy {
    +
    +	private static final Logger LOGGER = LoggerFactory.getLogger(ThreadProxy.class);
    +	
    +	protected static final IDFactory ID_FACTORY = IDFactory.valueOf(ThreadProxy.class);
    +	
    +	protected static final ThreadGroup THREAD_GROUP = new ThreadGroup(ThreadProxy.class.getName());
    +	
    +	protected final String id;
    +	
    +	protected final String createdBy;
    +	
    +	protected final ExecutorService executorService;
    +	
    +	protected final AtomicInteger executeCount;
    +	
    +	protected final AtomicInteger submitCount;
    +	
    +	protected final AtomicInteger exceptionCount;
    +	
    +	public ThreadProxy() {
    +		super();
    +		id = ID_FACTORY.next();
    +		createdBy =  Thread.currentThread().getName();
    +		executorService = new ThreadPoolExecutor(1, 1,
    --- End diff --
    
    Yes and to close the executor, also the parallel streams in mosaic via the transactional distributed use a fork join and create a thread proxy per dataset graph touched\u200b. Otherwise the system default fork join gets used and that limits the threads to the current processor current across all requests!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] jena pull request #233: Added mosaic and thrift packages to org.apache.jena....

Posted by ajs6f <gi...@git.apache.org>.
Github user ajs6f commented on a diff in the pull request:

    https://github.com/apache/jena/pull/233#discussion_r110013073
  
    --- Diff: jena-arq/src/main/java/org/apache/jena/sparql/core/mosaic/Topology.java ---
    @@ -0,0 +1,45 @@
    +package org.apache.jena.sparql.core.mosaic;
    +
    +import java.util.HashMap;
    +import java.util.Map;
    +
    +import org.apache.jena.sparql.core.DatasetGraph;
    +import org.apache.jena.sparql.core.mem.DatasetGraphInMemory;
    +import org.apache.jena.sparql.util.Symbol;
    +
    +public class Topology {
    +
    +	protected static final Map<Symbol, Metric> ENTRIES;
    +	
    +	public static Symbol keyFor(final Class<? extends DatasetGraph> c) {
    +		return Symbol.create(c.getName());
    +	}
    +	
    +	static {
    +		final Map<Symbol, Metric> entries = new HashMap<>();
    +		
    +		entries.put(keyFor(DatasetGraph.class), new Metric());
    +		
    +		entries.put(keyFor(DatasetGraphInMemory.class), new Metric());
    --- End diff --
    
    Just a style thing, but `org.apache.jena.ext.com.google.common.collect.ImmutableMap.of(K, V, K, V)` would let you build this right in the field declaration, no `static` stanza needed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] jena pull request #233: Added mosaic and thrift packages to org.apache.jena....

Posted by dick-twocows <gi...@git.apache.org>.
Github user dick-twocows commented on a diff in the pull request:

    https://github.com/apache/jena/pull/233#discussion_r110253625
  
    --- Diff: jena-arq/src/main/java/org/apache/jena/sparql/core/mosaic/ThreadProxy.java ---
    @@ -0,0 +1,100 @@
    +package org.apache.jena.sparql.core.mosaic;
    +
    +import java.util.concurrent.Callable;
    +import java.util.concurrent.ExecutorService;
    +import java.util.concurrent.Future;
    +import java.util.concurrent.LinkedBlockingQueue;
    +import java.util.concurrent.ThreadFactory;
    +import java.util.concurrent.ThreadPoolExecutor;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicInteger;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +/**
    + * A Thread proxy which performs actions using one thread (the proxy).
    + * Internally an ExecutorService with one Thread is used with the action methods execute/submit(Runnable) and submit(Callable<T>).
    + * Calls to the action methods are FIFO actioned through the LinkedBlockingQueue of the ExecutorService.
    + * Useful where Thread affinity is required in a parallel processing environment, e.g. .parallelStream().flatMap(...)
    + * 
    + * @author dick
    + *
    + */
    +public class ThreadProxy {
    --- End diff --
    
    What it was might have got lost in the iterations to is current form, let me check...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] jena issue #233: Added mosaic and thrift packages to org.apache.jena.sparql....

Posted by afs <gi...@git.apache.org>.
Github user afs commented on the issue:

    https://github.com/apache/jena/pull/233
  
    `StreamRDFTriHexTable` is on `dick-twocows:master` and in the PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---