You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@jena.apache.org by an...@apache.org on 2022/09/08 12:16:56 UTC
[jena] branch main updated: GH-1517: Fixes bulk requests becoming cut off and wires up bulk query parameters with assembler
This is an automated email from the ASF dual-hosted git repository.
andy pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/jena.git
The following commit(s) were added to refs/heads/main by this push:
new 23c6cee45f GH-1517: Fixes bulk requests becoming cut off and wires up bulk query parameters with assembler
new 5f49905ca1 Merge pull request #1518 from Aklakan/GH-1517
23c6cee45f is described below
commit 23c6cee45f83937783deba50d0fd6565e28aa4f9
Author: Claus Stadler <Ra...@googlemail.com>
AuthorDate: Tue Sep 6 01:05:14 2022 +0200
GH-1517: Fixes bulk requests becoming cut off and wires up bulk query parameters with assembler
---
.../assembler/DatasetAssemblerServiceEnhancer.java | 24 +++
.../enhancer/assembler/ServiceEnhancerVocab.java | 24 ++-
.../jena/sparql/service/enhancer/impl/Batcher.java | 218 +++++++++++++++++++++
.../impl/ChainingServiceExecutorBulkCache.java | 11 +-
...ChainingServiceExecutorBulkServiceEnhancer.java | 4 +-
.../enhancer/impl/QueryIterServiceBulk.java | 29 ++-
.../service/enhancer/impl/RequestScheduler.java | 212 --------------------
.../enhancer/init/ServiceEnhancerConstants.java | 5 +-
.../TestDatasetAssemblerServiceEnhancer.java | 26 ++-
.../sparql/service/enhancer/impl/TestBatcher.java | 141 +++++++++++++
10 files changed, 447 insertions(+), 247 deletions(-)
diff --git a/jena-extras/jena-serviceenhancer/src/main/java/org/apache/jena/sparql/service/enhancer/assembler/DatasetAssemblerServiceEnhancer.java b/jena-extras/jena-serviceenhancer/src/main/java/org/apache/jena/sparql/service/enhancer/assembler/DatasetAssemblerServiceEnhancer.java
index 4e39f80ecf..79cb455862 100644
--- a/jena-extras/jena-serviceenhancer/src/main/java/org/apache/jena/sparql/service/enhancer/assembler/DatasetAssemblerServiceEnhancer.java
+++ b/jena-extras/jena-serviceenhancer/src/main/java/org/apache/jena/sparql/service/enhancer/assembler/DatasetAssemblerServiceEnhancer.java
@@ -20,6 +20,7 @@ package org.apache.jena.sparql.service.enhancer.assembler;
import java.util.Objects;
+import org.apache.commons.lang3.function.TriFunction;
import org.apache.jena.assembler.Assembler;
import org.apache.jena.assembler.exceptions.AssemblerException;
import org.apache.jena.atlas.logging.Log;
@@ -28,16 +29,19 @@ import org.apache.jena.graph.Node;
import org.apache.jena.query.ARQ;
import org.apache.jena.query.Dataset;
import org.apache.jena.query.DatasetFactory;
+import org.apache.jena.rdf.model.Property;
import org.apache.jena.rdf.model.RDFNode;
import org.apache.jena.rdf.model.Resource;
import org.apache.jena.sparql.core.DatasetGraph;
import org.apache.jena.sparql.core.DatasetGraphWrapper;
import org.apache.jena.sparql.core.assembler.DatasetAssembler;
+import org.apache.jena.sparql.service.enhancer.impl.ChainingServiceExecutorBulkCache;
import org.apache.jena.sparql.service.enhancer.impl.ServiceResponseCache;
import org.apache.jena.sparql.service.enhancer.impl.util.GraphUtilsExtra;
import org.apache.jena.sparql.service.enhancer.init.ServiceEnhancerConstants;
import org.apache.jena.sparql.service.enhancer.init.ServiceEnhancerInit;
import org.apache.jena.sparql.util.Context;
+import org.apache.jena.sparql.util.Symbol;
import org.apache.jena.sparql.util.graph.GraphUtils;
/**
@@ -87,6 +91,16 @@ public class DatasetAssemblerServiceEnhancer
ServiceResponseCache.set(cxt, cache);
}
+ // Transfer values from the RDF model to the context
+ configureCxt(root, ServiceEnhancerVocab.bulkMaxSize, cxt, ServiceEnhancerConstants.serviceBulkMaxBindingCount,
+ false, ChainingServiceExecutorBulkCache.DFT_MAX_BULK_SIZE, GraphUtilsExtra::getAsInt);
+
+ configureCxt(root, ServiceEnhancerVocab.bulkSize, cxt, ServiceEnhancerConstants.serviceBulkBindingCount,
+ false, ChainingServiceExecutorBulkCache.DFT_BULK_SIZE, GraphUtilsExtra::getAsInt);
+
+ configureCxt(root, ServiceEnhancerVocab.bulkMaxOutOfBandSize, cxt, ServiceEnhancerConstants.serviceBulkMaxOutOfBandBindingCount,
+ false, ChainingServiceExecutorBulkCache.DFT_MAX_OUT_OUF_BAND_SIZE, GraphUtilsExtra::getAsInt);
+
// If management is enabled then return a wrapped dataset with a copy of the context which has
// mgmt enabled
if (enableMgmt) {
@@ -103,4 +117,14 @@ public class DatasetAssemblerServiceEnhancer
return result.asDatasetGraph();
}
+
+
+ /** Transfer a resource's property value to a context symbol's value */
+ private static <T> void configureCxt(Resource root, Property property, Context cxt, Symbol symbol, boolean applyDefaultValueIfPropertyAbsent, T defaultValue, TriFunction<Resource, Property, T, T> getValue) {
+ if (root.hasProperty(property) || applyDefaultValueIfPropertyAbsent) {
+ Object value = getValue.apply(root, property, defaultValue);
+ cxt.set(symbol, value);
+ }
+
+ }
}
diff --git a/jena-extras/jena-serviceenhancer/src/main/java/org/apache/jena/sparql/service/enhancer/assembler/ServiceEnhancerVocab.java b/jena-extras/jena-serviceenhancer/src/main/java/org/apache/jena/sparql/service/enhancer/assembler/ServiceEnhancerVocab.java
index 11444de2e6..d298e7b84b 100644
--- a/jena-extras/jena-serviceenhancer/src/main/java/org/apache/jena/sparql/service/enhancer/assembler/ServiceEnhancerVocab.java
+++ b/jena-extras/jena-serviceenhancer/src/main/java/org/apache/jena/sparql/service/enhancer/assembler/ServiceEnhancerVocab.java
@@ -34,23 +34,31 @@ public class ServiceEnhancerVocab {
public static final Resource DatasetServiceEnhancer = ResourceFactory.createResource(NS + "DatasetServiceEnhancer");
/** The id (a node) to which to resolve urn:x-arq:self */
- public static final Property datasetId = ResourceFactory.createProperty(NS + "datasetId");
+ public static final Property datasetId = ResourceFactory.createProperty(NS + "datasetId");
/** Enable privileged management functions; creates a wrapped dataset with a copied context */
- public static final Property enableMgmt = ResourceFactory.createProperty(NS + "enableMgmt");
+ public static final Property enableMgmt = ResourceFactory.createProperty(NS + "enableMgmt");
- // The term "baseDataset" is not officially in ja but it seems reasonable to eventually add it there
- // (so far ja only defines baseModel)
- public static final Property baseDataset = ResourceFactory.createProperty(JA.getURI() + "baseDataset");
+ /** The term "baseDataset" is not officially in ja but it seems reasonable to eventually add it there.
+ * So far ja only defines baseModel */
+ public static final Property baseDataset = ResourceFactory.createProperty(JA.getURI() + "baseDataset");
/** Maximum number of entries the service cache can hold */
- public static final Property cacheMaxEntryCount = ResourceFactory.createProperty(NS + "cacheMaxEntryCount") ;
+ public static final Property cacheMaxEntryCount = ResourceFactory.createProperty(NS + "cacheMaxEntryCount");
/** Number number of pages for bindings an individual cache entry can hold */
- public static final Property cacheMaxPageCount = ResourceFactory.createProperty(NS + "cacheMaxPageCount") ;
+ public static final Property cacheMaxPageCount = ResourceFactory.createProperty(NS + "cacheMaxPageCount");
/** Number of bindings a page can hold */
- public static final Property cachePageSize = ResourceFactory.createProperty(NS + "cachePageSize") ;
+ public static final Property cachePageSize = ResourceFactory.createProperty(NS + "cachePageSize");
+
+ /** Maximum size (in terms of input bindings) of bulk requests */
+ public static final Property bulkMaxSize = ResourceFactory.createProperty(NS + "bulkMaxSize");
+
+ /** Bulk size to use if no other is set. Capped by bulkMaxSize. */
+ public static final Property bulkSize = ResourceFactory.createProperty(NS + "bulkSize");
+
+ public static final Property bulkMaxOutOfBandSize = ResourceFactory.createProperty(NS + "bulkMaxOutOfBandSize");
/** Adds the following prefix declarations to the given map thereby overrides existing ones:
* <table style="border: 1px solid;">
diff --git a/jena-extras/jena-serviceenhancer/src/main/java/org/apache/jena/sparql/service/enhancer/impl/Batcher.java b/jena-extras/jena-serviceenhancer/src/main/java/org/apache/jena/sparql/service/enhancer/impl/Batcher.java
new file mode 100644
index 0000000000..5df9a41328
--- /dev/null
+++ b/jena-extras/jena-serviceenhancer/src/main/java/org/apache/jena/sparql/service/enhancer/impl/Batcher.java
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.sparql.service.enhancer.impl;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.NavigableMap;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.TreeMap;
+import java.util.function.Function;
+
+import org.apache.jena.atlas.iterator.Iter;
+import org.apache.jena.atlas.iterator.IteratorCloseable;
+import org.apache.jena.ext.com.google.common.collect.AbstractIterator;
+
+/**
+ * The batcher transform an iterator of input items into an iterator of batches.
+ * Thereby, items are assigned incremental ids.
+ * Every returned batch will start with the lowest item index not covered by any prior batch.
+ *
+ * Parameters are:
+ * <ul>
+ * <li>maxBulkSize: The maximum size of the batches which to form</li>
+ * <li>maxOutOfBandItemCount: The maximum number of out-of-band-items when forming the next batch.
+ * Once this threshold is reached then a batch is returned even if it wasn't full yet.</li>
+ * </ul>
+ *
+ * A batch is guaranteed to have at least one item.
+ *
+ * @param <G> group key type (e.g. type of service IRIs)
+ * @param <I> item type (e.g. Binding)
+ */
+public class Batcher<G, I> {
+ /** The maximum size of the batches formed by this class */
+ protected int maxBulkSize;
+
+ /** Allow up to this number of out-of-band-items when forming the next batch.
+ * Once this threshold is reached then a batch is returned even if it wasn't full yet.
+ */
+ protected int maxOutOfBandItemCount;
+
+ /** Function to map an input item to a group key*/
+ protected Function<I, G> itemToGroupKey;
+
+ public Batcher(Function<I, G> itemToGroupKey, int maxBulkSize, int maxOutOfBandItemCount) {
+ super();
+ this.itemToGroupKey = itemToGroupKey;
+ this.maxBulkSize = maxBulkSize;
+ this.maxOutOfBandItemCount = maxOutOfBandItemCount;
+ }
+
+ public IteratorCloseable<GroupedBatch<G, Long, I>> batch(IteratorCloseable<I> inputIterator) {
+ return new IteratorGroupedBatch(inputIterator);
+ }
+
+ class IteratorGroupedBatch
+ extends AbstractIterator<GroupedBatch<G, Long, I>> implements IteratorCloseable<GroupedBatch<G, Long, I>>
+ {
+ protected IteratorCloseable<I> inputIterator;
+
+ /** The position of the inputIterator */
+ protected long inputIteratorOffset;
+
+ // Offsets of the group keys
+ protected NavigableMap<Long, G> nextGroup = new TreeMap<>();
+
+ // The outer navigable map has to lowest offset among all the group key's related batches
+ protected Map<G, NavigableMap<Long, Batch<Long, I>>> groupToBatches = new HashMap<>();
+
+ public IteratorGroupedBatch(IteratorCloseable<I> inputIterator) {
+ this(inputIterator, 0);
+ }
+
+ public IteratorGroupedBatch(IteratorCloseable<I> inputIterator, int inputIteratorOffset) {
+ super();
+ this.inputIterator = inputIterator;
+ this.inputIteratorOffset = inputIteratorOffset;
+ }
+
+ @Override
+ protected GroupedBatch<G, Long, I> computeNext() {
+ // For the current result group key and corresponding batch determine how many out-of-band
+ // items we have already consumed from the input iterator
+ // Any item that does not contribute to the current result batch counts as out-of-band
+
+ // The key of the first pending batch - null if there is none yet
+ Optional<G> resultGroupKeyOpt = Optional.ofNullable(nextGroup.firstEntry()).map(Entry::getValue);
+ G resultGroupKey = resultGroupKeyOpt.orElse(null);
+
+ Optional<Entry<Long, Batch<Long, I>>> resultBatchEntry = resultGroupKeyOpt
+ .map(groupToBatches::get).map(offsetToBatch -> offsetToBatch.firstEntry());
+
+ long resultBatchMinOffset = resultBatchEntry.map(Entry::getKey).orElse(inputIteratorOffset);
+
+ // The result batch - null if there is none
+ Optional<Batch<Long, I>> resultBatch = resultBatchEntry.map(Entry::getValue);
+
+ // Get the highest offset of the result batch (if there is one)
+ // If a new batch has yet to be created its item ids start with inputIteratorOffset
+ long resultBatchMaxOffset = resultBatch.map(Batch::getNextValidIndex).orElse(inputIteratorOffset);
+
+ int resultBatchSize = resultBatch.map(Batch::getItems).map(Map::size).orElse(0);
+
+ long outOfBandItemCountInResultBatch = resultBatchMaxOffset - resultBatchMinOffset - resultBatchSize;
+ long outOfBandItemCountInOtherBatches = inputIteratorOffset - resultBatchMaxOffset;
+
+ // The outOfBandItemCount may be counted up in the loop below
+ long outOfBandItemCount = outOfBandItemCountInResultBatch + outOfBandItemCountInOtherBatches;
+
+ // The following four variables are re-initialized whenever the groupKey changes
+ G previousGroupKey = null; // The input iterator must never yield a null group key
+ Batch<Long, I> currentBatch = null;
+ NavigableMap<Long, Batch<Long, I>> currentBatches = null;
+ int currentBatchSize = -1;
+
+ // Only look at the input iterator if the current batch is not yet full
+ if (resultBatchSize < maxBulkSize) {
+ while (inputIterator.hasNext() && outOfBandItemCount <= maxOutOfBandItemCount) {
+ I input = inputIterator.next();
+ G currentGroupKey = itemToGroupKey.apply(input);
+ Objects.requireNonNull(currentGroupKey); // Sanity check
+
+ if (!Objects.equals(currentGroupKey, previousGroupKey)) {
+ previousGroupKey = currentGroupKey;
+
+ currentBatches = groupToBatches.computeIfAbsent(currentGroupKey, x -> new TreeMap<>());
+ if (currentBatches.isEmpty()) {
+ currentBatch = BatchImpl.forLong();
+ currentBatches.put(inputIteratorOffset, currentBatch);
+ nextGroup.put(inputIteratorOffset, currentGroupKey);
+ } else {
+ currentBatch = currentBatches.lastEntry().getValue();
+ }
+ currentBatchSize = currentBatch.size();
+
+ if (resultGroupKey == null) {
+ resultGroupKey = currentGroupKey;
+ }
+ }
+
+ // Add the item to the current batch if it is not full
+ // Otherwise start a new batch
+ Batch<Long, I> insertTargetBatch;
+ if (currentBatchSize >= maxBulkSize) {
+ insertTargetBatch = BatchImpl.forLong();
+ currentBatches.put(inputIteratorOffset, insertTargetBatch);
+ nextGroup.put(inputIteratorOffset, currentGroupKey);
+ } else {
+ insertTargetBatch = currentBatch;
+ // We are just about to add an item to the current batch
+ ++currentBatchSize;
+ }
+ insertTargetBatch.put(inputIteratorOffset, input);
+ ++inputIteratorOffset;
+
+ // If we just completely filled up the result batch then break
+ if (currentGroupKey.equals(resultGroupKey)) {
+ if (currentBatchSize >= maxBulkSize) {
+ break;
+ }
+ } else {
+ ++outOfBandItemCount;
+ }
+
+ // Update the state if we started a new batch (because the current one was full)
+ if (insertTargetBatch != currentBatch) {
+ currentBatch = insertTargetBatch;
+ currentBatchSize = insertTargetBatch.size();
+ }
+ }
+ }
+
+ // Return and remove the first batch from our data structures
+ GroupedBatch<G, Long, I> result;
+ Iterator<Entry<Long, G>> nextGroupIt = nextGroup.entrySet().iterator();
+ if (nextGroupIt.hasNext()) {
+ Entry<Long, G> e = nextGroupIt.next();
+ resultGroupKey = e.getValue();
+ nextGroupIt.remove();
+ // nextInputId = e.getKey();
+
+ NavigableMap<Long, Batch<Long, I>> nextBatches = groupToBatches.get(resultGroupKey);
+ Iterator<Batch<Long, I>> nextBatchesIt = nextBatches.values().iterator();
+ Batch<Long, I> resultBatchTmp = nextBatchesIt.next();
+ nextBatchesIt.remove();
+
+ result = new GroupedBatchImpl<>(resultGroupKey, resultBatchTmp);
+ } else {
+ result = endOfData();
+ }
+ return result;
+ }
+
+ @Override
+ public void close() {
+ Iter.close(inputIterator);
+ }
+ }
+}
diff --git a/jena-extras/jena-serviceenhancer/src/main/java/org/apache/jena/sparql/service/enhancer/impl/ChainingServiceExecutorBulkCache.java b/jena-extras/jena-serviceenhancer/src/main/java/org/apache/jena/sparql/service/enhancer/impl/ChainingServiceExecutorBulkCache.java
index 7c385cb678..b263c0aa09 100644
--- a/jena-extras/jena-serviceenhancer/src/main/java/org/apache/jena/sparql/service/enhancer/impl/ChainingServiceExecutorBulkCache.java
+++ b/jena-extras/jena-serviceenhancer/src/main/java/org/apache/jena/sparql/service/enhancer/impl/ChainingServiceExecutorBulkCache.java
@@ -36,9 +36,9 @@ import org.apache.jena.sparql.util.Context;
public class ChainingServiceExecutorBulkCache
implements ChainingServiceExecutorBulk {
- public static final int DEFAULT_BULK_SIZE = 30;
- public static final int MAX_BULK_SIZE = 100;
- public static final int DEFAULT_MAX_BYTE_SIZE = 5000;
+ public static final int DFT_BULK_SIZE = 30;
+ public static final int DFT_MAX_BULK_SIZE = 100;
+ public static final int DFT_MAX_OUT_OUF_BAND_SIZE = 30;
protected int bulkSize;
protected CacheMode cacheMode;
@@ -67,8 +67,9 @@ public class ChainingServiceExecutorBulkCache
OpServiceExecutorImpl opExecutor = new OpServiceExecutorImpl(serviceInfo.getOpService(), execCxt, chain);
- RequestScheduler<Node, Binding> scheduler = new RequestScheduler<>(serviceInfo::getSubstServiceNode, bulkSize);
- IteratorCloseable<GroupedBatch<Node, Long, Binding>> inputBatchIterator = scheduler.group(input);
+ int maxOutOfBandItemCount = cxt.getInt(ServiceEnhancerConstants.serviceBulkMaxOutOfBandBindingCount, DFT_MAX_OUT_OUF_BAND_SIZE);
+ Batcher<Node, Binding> scheduler = new Batcher<>(serviceInfo::getSubstServiceNode, bulkSize, maxOutOfBandItemCount);
+ IteratorCloseable<GroupedBatch<Node, Long, Binding>> inputBatchIterator = scheduler.batch(input);
RequestExecutor exec = new RequestExecutor(opExecutor, serviceInfo, resultSizeCache, serviceCache, cacheMode, inputBatchIterator);
diff --git a/jena-extras/jena-serviceenhancer/src/main/java/org/apache/jena/sparql/service/enhancer/impl/ChainingServiceExecutorBulkServiceEnhancer.java b/jena-extras/jena-serviceenhancer/src/main/java/org/apache/jena/sparql/service/enhancer/impl/ChainingServiceExecutorBulkServiceEnhancer.java
index 4db5816e67..f3c1e91a54 100644
--- a/jena-extras/jena-serviceenhancer/src/main/java/org/apache/jena/sparql/service/enhancer/impl/ChainingServiceExecutorBulkServiceEnhancer.java
+++ b/jena-extras/jena-serviceenhancer/src/main/java/org/apache/jena/sparql/service/enhancer/impl/ChainingServiceExecutorBulkServiceEnhancer.java
@@ -76,8 +76,8 @@ public class ChainingServiceExecutorBulkServiceEnhancer
case ServiceOpts.SO_BULK: // Enables bulk requests
enableBulk = true;
- int maxBulkSize = cxt.get(ServiceEnhancerConstants.serviceBulkMaxBindingCount, ChainingServiceExecutorBulkCache.MAX_BULK_SIZE);
- bulkSize = cxt.get(ServiceEnhancerConstants.serviceBulkBindingCount, ChainingServiceExecutorBulkCache.DEFAULT_BULK_SIZE);
+ int maxBulkSize = cxt.get(ServiceEnhancerConstants.serviceBulkMaxBindingCount, ChainingServiceExecutorBulkCache.DFT_MAX_BULK_SIZE);
+ bulkSize = cxt.get(ServiceEnhancerConstants.serviceBulkBindingCount, ChainingServiceExecutorBulkCache.DFT_BULK_SIZE);
try {
if (val == null || val.isBlank()) {
// Ignored
diff --git a/jena-extras/jena-serviceenhancer/src/main/java/org/apache/jena/sparql/service/enhancer/impl/QueryIterServiceBulk.java b/jena-extras/jena-serviceenhancer/src/main/java/org/apache/jena/sparql/service/enhancer/impl/QueryIterServiceBulk.java
index 695236aeb2..3df6bec0df 100644
--- a/jena-extras/jena-serviceenhancer/src/main/java/org/apache/jena/sparql/service/enhancer/impl/QueryIterServiceBulk.java
+++ b/jena-extras/jena-serviceenhancer/src/main/java/org/apache/jena/sparql/service/enhancer/impl/QueryIterServiceBulk.java
@@ -33,7 +33,6 @@ import org.apache.jena.atlas.iterator.Iter;
import org.apache.jena.atlas.iterator.IteratorCloseable;
import org.apache.jena.atlas.iterator.IteratorOnClose;
import org.apache.jena.atlas.lib.Closeable;
-import org.apache.jena.atlas.logging.Log;
import org.apache.jena.ext.com.google.common.collect.Iterators;
import org.apache.jena.ext.com.google.common.collect.Range;
import org.apache.jena.ext.com.google.common.collect.RangeMap;
@@ -71,6 +70,8 @@ import org.apache.jena.sparql.service.enhancer.slice.api.ReadableChannelWithLimi
import org.apache.jena.sparql.service.enhancer.slice.api.Slice;
import org.apache.jena.sparql.service.enhancer.slice.api.SliceAccessor;
import org.apache.jena.sparql.util.NodeFactoryExtra;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* QueryIter to process service requests in bulk with support for streaming caching.
@@ -81,6 +82,8 @@ import org.apache.jena.sparql.util.NodeFactoryExtra;
public class QueryIterServiceBulk
extends QueryIterSlottedBase
{
+ private static final Logger logger = LoggerFactory.getLogger(QueryIterServiceBulk.class);
+
protected OpServiceInfo serviceInfo;
protected ServiceCacheKeyFactory cacheKeyFactory;
@@ -196,11 +199,11 @@ public class QueryIterServiceBulk
boolean isBackendIt = sliceKeysForBackend.contains(partKey);
if (isBackendIt && !activeIt.hasNext()) {
- Log.debug(QueryIterServiceBulk.class, "Iterator ended without end marker - assuming remote result set limit reached");
+ logger.debug("Iterator ended without end marker - assuming remote result set limit reached");
long seenBackendData = backendIt.getOffset();
backendResultSetLimit = new Estimate<>(seenBackendData, true);
if (seenBackendData <= 0) {
- Log.warn(QueryIterServiceBulk.class, "Known result set limit of " + seenBackendData + " detected");
+ logger.warn("Known result set limit of " + seenBackendData + " detected");
}
resultSizeCache.updateLimit(targetService, backendResultSetLimit);
@@ -470,7 +473,10 @@ public class QueryIterServiceBulk
int nextAllocOutputId = 0;
int batchSize = inputs.size();
- Log.info(QueryIterServiceBulk.class, "Schedule for current batch:");
+ if (logger.isInfoEnabled()) {
+ logger.info("Schedule for current batch:");
+ }
+
int rangeId = currentRangeId;
for (int inputId = currentInputId; inputId < batchSize; ++inputId) {
@@ -508,7 +514,9 @@ public class QueryIterServiceBulk
lock = slice.getReadWriteLock().readLock();
- Log.debug(QueryIterServiceBulk.class, "Created cache key: " + cacheKey);
+ if (logger.isDebugEnabled()) {
+ logger.debug("Created cache key: " + cacheKey);
+ }
// Log.debug(BatchRequestIterator.class, "Cached ranges: " + slice.getLoadedRanges().toString());
lock.lock();
@@ -577,11 +585,12 @@ public class QueryIterServiceBulk
// - We need to start the backend request from the request offset
// - The issue is how to handle the next binding
- Log.info(QueryIterServiceBulk.class, "input " + inputId + ": " +
- allRanges.toString()
- .replace("false", "fetch")
- .replace("true", "cached"));
-
+ if (logger.isInfoEnabled()) {
+ logger.info("input " + inputId + ": " +
+ allRanges.toString()
+ .replace("false", "fetch")
+ .replace("true", "cached"));
+ }
Map<Range<Long>, Boolean> mapOfRanges = allRanges.asMapOfRanges();
if (mapOfRanges.isEmpty()) {
diff --git a/jena-extras/jena-serviceenhancer/src/main/java/org/apache/jena/sparql/service/enhancer/impl/RequestScheduler.java b/jena-extras/jena-serviceenhancer/src/main/java/org/apache/jena/sparql/service/enhancer/impl/RequestScheduler.java
deleted file mode 100644
index c09a06a1f0..0000000000
--- a/jena-extras/jena-serviceenhancer/src/main/java/org/apache/jena/sparql/service/enhancer/impl/RequestScheduler.java
+++ /dev/null
@@ -1,212 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.jena.sparql.service.enhancer.impl;
-
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.NavigableMap;
-import java.util.Objects;
-import java.util.Optional;
-import java.util.TreeMap;
-import java.util.function.Function;
-
-import org.apache.jena.atlas.iterator.Iter;
-import org.apache.jena.atlas.iterator.IteratorCloseable;
-import org.apache.jena.ext.com.google.common.collect.AbstractIterator;
-
-/**
- * Accumulates items from an input iterator into batches.
- * Every returned batch will start with the first item index not covered by any prior batch.
- *
- * Parameters are:
- * <ul>
- * <li>maxBulkSize: The maximum number of items allowed in a batch returned by a call to next()</li>
- * <li>maxeReadAhead: The maximum number of items allowed to read from the input iterator in order to fill a batch</li>
- * <li>maxInputDistance: The index of items w.r.t. to the input iterator in a batch must not be farther apart than this distance</li>
- * </ul>
- *
- * A batch is guaranteed to have at least one item.
- * If any of the thresholds is exceeded a batch will have fewer items that its maximum allowed size.
- *
- *
- * @param <G> group key type (e.g. service IRI)
- * @param <I> item type (e.g. Binding)
- */
-public class RequestScheduler<G, I> {
- protected int maxBulkSize;
-
- /** Allow reading at most this number of items ahead for the input iterator to completely fill
- * the batch request for the next response */
- protected int maxReadAhead = 300;
-
- /** Do not group inputs into the same batch if their ids are this (or more) of that amount apart */
- protected int maxInputDistance = 50;
-
- // protected Iterator<I> inputIterator;
- protected Function<I, G> inputToGroup;
-
- public RequestScheduler(Function<I, G> inputToGroup, int maxBulkSize) {
- super();
- this.inputToGroup = inputToGroup;
- this.maxBulkSize = maxBulkSize;
- }
-
- public IteratorCloseable<GroupedBatch<G, Long, I>> group(IteratorCloseable<I> inputIterator) {
- return new Grouper(inputIterator);
- }
-
- class Grouper
- extends AbstractIterator<GroupedBatch<G, Long, I>> implements IteratorCloseable<GroupedBatch<G, Long, I>>
- {
- protected IteratorCloseable<I> inputIterator;
-
- /** The position of the inputIterator */
- protected long inputIteratorOffset;
-
- /** The offset of the next item being returned */
- protected long nextResultOffset;
-
- protected long nextInputId;
-
- // the outer navigable map has to lowest offset of the batch
- protected Map<G, NavigableMap<Long, Batch<Long, I>>> groupToBatches = new HashMap<>();
-
- // Offsets of the group keys
- protected NavigableMap<Long, G> nextGroup = new TreeMap<>();
-
- public Grouper(IteratorCloseable<I> inputIterator) {
- this(inputIterator, 0);
- }
-
- public Grouper(IteratorCloseable<I> inputIterator, int inputIteratorOffset) {
- super();
- this.inputIterator = inputIterator;
- this.inputIteratorOffset = inputIteratorOffset;
- this.nextResultOffset = inputIteratorOffset;
- }
-
- @Override
- protected GroupedBatch<G, Long, I> computeNext() {
- G resultGroupKey = Optional.ofNullable(nextGroup.firstEntry()).map(Entry::getValue).orElse(null);
- G lastGroupKey = null;
-
- // Cached references
- NavigableMap<Long, Batch<Long, I>> batches = null;
- Batch<Long, I> batch = null;
-
- while (inputIterator.hasNext() && inputIteratorOffset - nextResultOffset < maxReadAhead) {
- I input = inputIterator.next();
- G groupKey = inputToGroup.apply(input);
-
- if (!Objects.equals(groupKey, lastGroupKey)) {
- lastGroupKey = groupKey;
-
- if (resultGroupKey == null) {
- resultGroupKey = groupKey;
- }
-
- batches = groupToBatches.computeIfAbsent(groupKey, x -> new TreeMap<>());
- if (batches.isEmpty()) {
- batch = BatchImpl.forLong();
- batches.put(inputIteratorOffset, batch);
- nextGroup.put(inputIteratorOffset, groupKey);
- } else {
- batch = batches.lastEntry().getValue();
- }
- }
-
- // Check whether we need to start a new request
- // Either because the batch is full or the differences between the input ids is too great
- long batchEndOffset = batch.getNextValidIndex();
- long distance = nextInputId - batchEndOffset;
-
- // If the item is consecutive add it to the list
- int batchSize = batch.size();
- if (distance > maxInputDistance || batchSize >= maxBulkSize) {
- batch = BatchImpl.forLong();
- batches.put(inputIteratorOffset, batch);
- }
- batch.put(inputIteratorOffset, input);
- ++inputIteratorOffset;
-
- // If the batch of the result group just became full then break
- if (groupKey.equals(resultGroupKey) && batchSize + 1 >= maxBulkSize) {
- break;
- }
- }
-
- // Return and remove the first batch from our data structures
-
- GroupedBatch<G, Long, I> result;
- Iterator<Entry<Long, G>> nextGroupIt = nextGroup.entrySet().iterator();
- if (nextGroupIt.hasNext()) {
- Entry<Long, G> e = nextGroupIt.next();
- resultGroupKey = e.getValue();
- nextGroupIt.remove();
- nextInputId = e.getKey();
-
- NavigableMap<Long, Batch<Long, I>> nextBatches = groupToBatches.get(resultGroupKey);
- Iterator<Batch<Long, I>> nextBatchesIt = nextBatches.values().iterator();
- Batch<Long, I> resultBatch = nextBatchesIt.next();
- nextBatchesIt.remove();
-
- result = new GroupedBatchImpl<>(resultGroupKey, resultBatch);
- } else {
- result = endOfData();
- }
- return result;
- }
-
- @Override
- public void close() {
- Iter.close(inputIterator);
- }
- }
-
-//
-// public static void main(String[] args) {
-// Var v = Var.alloc("v");
-// Iterator<Binding> individualIt = IntStream.range(0, 10)
-// .mapToObj(x -> BindingFactory.binding(v, NodeValue.makeInteger(x).asNode()))
-// .iterator();
-//
-// Op op = Algebra.compile(QueryFactory.create("SELECT * { ?v ?p ?o }"));
-// OpService opService = new OpService(v, op, false);
-// OpServiceInfo serviceInfo = new OpServiceInfo(opService);
-//
-//
-// RequestScheduler<Node, Binding> scheduler = new RequestScheduler<>(b ->
-// NodeFactory.createLiteral("group" + (NodeValue.makeNode(b.get(v)).getInteger().intValue() % 3)), 2);
-// Iterator<ServiceBatchRequest<Node, Binding>> batchIt = scheduler.group(individualIt);
-//
-// OpServiceExecutorImpl opExecutor = null;
-//
-// RequestExecutor executor = new RequestExecutor(opExecutor, serviceInfo, batchIt);
-// // executor.exec();
-//
-//// while (batchIt.hasNext()) {
-//// System.out.println(batchIt.next());
-//// }
-//
-//
-// }
-//
-}
diff --git a/jena-extras/jena-serviceenhancer/src/main/java/org/apache/jena/sparql/service/enhancer/init/ServiceEnhancerConstants.java b/jena-extras/jena-serviceenhancer/src/main/java/org/apache/jena/sparql/service/enhancer/init/ServiceEnhancerConstants.java
index 76d3f5a69d..5c08373ead 100644
--- a/jena-extras/jena-serviceenhancer/src/main/java/org/apache/jena/sparql/service/enhancer/init/ServiceEnhancerConstants.java
+++ b/jena-extras/jena-serviceenhancer/src/main/java/org/apache/jena/sparql/service/enhancer/init/ServiceEnhancerConstants.java
@@ -36,8 +36,11 @@ public class ServiceEnhancerConstants {
/** Maximum number of bindings to group into a single bulk request; restricts serviceBulkRequestItemCount */
public static final Symbol serviceBulkMaxBindingCount = SystemARQ.allocSymbol(NS, "serviceBulkMaxBindingCount") ;
+ /** Maximum number of out-of-band bindings that can be skipped over when forming an individual bulk request */
+ public static final Symbol serviceBulkMaxOutOfBandBindingCount = SystemARQ.allocSymbol(NS, "serviceBulkMaxOutOfBandBindingCount") ;
+
/** Number of bindings to group into a single bulk request */
- public static final Symbol serviceBulkBindingCount = SystemARQ.allocSymbol(NS, "serviceBulkMaxBindingCount") ;
+ public static final Symbol serviceBulkBindingCount = SystemARQ.allocSymbol(NS, "serviceBulkBindingCount") ;
/** Symbol for the cache of services' result sets */
public static final Symbol serviceCache = SystemARQ.allocSymbol(NS, "serviceCache") ;
diff --git a/jena-extras/jena-serviceenhancer/src/test/java/org/apache/jena/sparql/service/enhancer/assembler/TestDatasetAssemblerServiceEnhancer.java b/jena-extras/jena-serviceenhancer/src/test/java/org/apache/jena/sparql/service/enhancer/assembler/TestDatasetAssemblerServiceEnhancer.java
index 321d4475a9..e9aea003c1 100644
--- a/jena-extras/jena-serviceenhancer/src/test/java/org/apache/jena/sparql/service/enhancer/assembler/TestDatasetAssemblerServiceEnhancer.java
+++ b/jena-extras/jena-serviceenhancer/src/test/java/org/apache/jena/sparql/service/enhancer/assembler/TestDatasetAssemblerServiceEnhancer.java
@@ -30,31 +30,39 @@ import org.apache.jena.rdf.model.Model;
import org.apache.jena.rdf.model.ModelFactory;
import org.apache.jena.riot.Lang;
import org.apache.jena.riot.RDFDataMgr;
+import org.apache.jena.sparql.service.enhancer.init.ServiceEnhancerConstants;
+import org.apache.jena.sparql.util.Context;
import org.apache.jena.vocabulary.RDF;
import org.junit.Assert;
import org.junit.Test;
public class TestDatasetAssemblerServiceEnhancer
{
- /**
- * This test case attempts to assemble a dataset with the service enhancer plugin
- * set up in its context. A query making use of enhancer features is fired against it.
- * Only if the plugin is loaded successfully then the query will execute successfully.
- */
- @Test
- public void testAssembler() {
- String specStr = String.join("\n",
+ private static final String SPEC_STR_01 = String.join("\n",
"PREFIX ja: <http://jena.hpl.hp.com/2005/11/Assembler#>",
"PREFIX se: <http://jena.apache.org/service-enhancer#>",
"<urn:example:root> a se:DatasetServiceEnhancer ; ja:baseDataset <urn:example:base> .",
"<urn:example:root> se:cacheMaxEntryCount 5 ; se:cachePageSize 1000 ; se:cacheMaxPageCount 10 .",
+ "<urn:example:root> se:bulkMaxSize 20 ; se:bulkSize 10 ; se:bulkMaxOutOfBandSize 5 .",
"<urn:example:base> a ja:MemoryDataset ."
);
+ /**
+ * This test case attempts to assemble a dataset with the service enhancer plugin
+ * set up in its context. A query making use of enhancer features is fired against it.
+ * Only if the plugin is loaded successfully then the query will execute successfully.
+ */
+ @Test
+ public void testAssembler() {
Model spec = ModelFactory.createDefaultModel();
- RDFDataMgr.read(spec, new StringReader(specStr), null, Lang.TURTLE);
+ RDFDataMgr.read(spec, new StringReader(SPEC_STR_01), null, Lang.TURTLE);
Dataset dataset = DatasetFactory.assemble(spec.getResource("urn:example:root"));
+ Context cxt = dataset.getContext();
+
+ Assert.assertEquals(20, cxt.getInt(ServiceEnhancerConstants.serviceBulkMaxBindingCount, -1));
+ Assert.assertEquals(10, cxt.getInt(ServiceEnhancerConstants.serviceBulkBindingCount, -1));
+ Assert.assertEquals(5, cxt.getInt(ServiceEnhancerConstants.serviceBulkMaxOutOfBandBindingCount, -1));
try (QueryExecution qe = QueryExecutionFactory.create(
"SELECT * { BIND(<urn:example:x> AS ?x) SERVICE <loop:bulk+10:> { ?x ?y ?z } }", dataset)) {
diff --git a/jena-extras/jena-serviceenhancer/src/test/java/org/apache/jena/sparql/service/enhancer/impl/TestBatcher.java b/jena-extras/jena-serviceenhancer/src/test/java/org/apache/jena/sparql/service/enhancer/impl/TestBatcher.java
new file mode 100644
index 0000000000..e5e53dd60d
--- /dev/null
+++ b/jena-extras/jena-serviceenhancer/src/test/java/org/apache/jena/sparql/service/enhancer/impl/TestBatcher.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.sparql.service.enhancer.impl;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Random;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import org.apache.jena.atlas.iterator.Iter;
+import org.apache.jena.atlas.iterator.IteratorCloseable;
+import org.apache.jena.ext.com.google.common.collect.Streams;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestBatcher {
+
+ private static final List<Entry<String, Integer>> testData01 = List.<Entry<String, Integer>>of(
+ Map.entry("a", 0),
+ Map.entry("a", 1),
+ Map.entry("b", 2),
+ Map.entry("b", 3),
+ Map.entry("b", 4),
+ Map.entry("a", 5),
+ Map.entry("c", 6),
+ Map.entry("a", 7),
+ Map.entry("c", 8),
+ Map.entry("c", 9),
+ Map.entry("d", 10),
+ Map.entry("d", 11),
+ Map.entry("c", 12),
+ Map.entry("c", 13),
+ Map.entry("d", 14),
+ Map.entry("e", 15));
+
+ /** Test that grouping items correctly considers maxBatchSize and maxOutOfBindItemCount
+ * Batch size = 3; up to two out-of-band items are allowed to form a batch.
+ */
+ @Test
+ public void testBatcher_01_3_2() {
+ // The numbers below refer to the value-component of the test data
+ List<List<Integer>> expectedBatchIds = List.<List<Integer>>of(
+ List.of(0, 1),
+ List.of(2, 3, 4),
+ List.of(5, 7),
+ List.of(6, 8, 9),
+ List.of(10, 11, 14),
+ List.of(12, 13),
+ List.of(15)
+ );
+ eval(testData01, 3, 2, expectedBatchIds);
+ }
+
+ /** Form batches up to size 3 allowing 0 out-of-band items; every change in the group id starts a new batch */
+ @Test
+ public void testBatcher_01_3_0() {
+ // The numbers below refer to the value-component of the test data
+ List<List<Integer>> expectedBatchIds = List.<List<Integer>>of(
+ List.of(0, 1),
+ List.of(2, 3, 4),
+ List.of(5),
+ List.of(6),
+ List.of(7),
+ List.of(8, 9),
+ List.of(10, 11),
+ List.of(12, 13),
+ List.of(14),
+ List.of(15)
+ );
+ eval(testData01, 3, 0, expectedBatchIds);
+ }
+
+ public static void eval(
+ List<Entry<String, Integer>> input,
+ int maxBatchSize,
+ int maxOutOfBandItemCount,
+ List<List<Integer>> expectedBatchIds) {
+ IteratorCloseable<GroupedBatch<String, Long, Entry<String, Integer>>> it = new Batcher<String, Entry<String, Integer>>
+ (Entry::getKey, maxBatchSize, maxOutOfBandItemCount).batch(Iter.iter(input.iterator()));
+ // it.forEachRemaining(System.err::println);
+
+ // For each obtained batch extract the list of values
+ List<List<Integer>> actualBatchIds = Streams.stream(it)
+ .map(groupedBatch -> groupedBatch.getBatch().getItems().values().stream().map(Entry::getValue).collect(Collectors.toList()))
+ .collect(Collectors.toList());
+
+ Assert.assertEquals(expectedBatchIds, actualBatchIds);
+ }
+
+ /** This test creates random input, batches it and checks that the number of batched items matches that of the input */
+ @Test
+ public void testBatcher_largeInput() {
+ int expectedItemCount = 10000;
+ int maxBatchSize = 10;
+ int maxGroupCount = 10;
+ Random rand = new Random(0);
+
+ List<Entry<String, Integer>> testData = new ArrayList<>(expectedItemCount);
+ int capacity = expectedItemCount;
+ int i = 0;
+ while (capacity > 0) {
+ String groupName = Character.toString((char)('a' + rand.nextInt(maxGroupCount)));
+ int batchSize = Math.min(capacity, 1 + rand.nextInt(maxBatchSize - 1));
+ for (int j = 0; j < batchSize; ++j) {
+ testData.add(Map.entry(groupName, i++));
+ }
+ capacity -= batchSize;
+ }
+
+ Stream<GroupedBatch<String, Long, Entry<String, Integer>>> stream = Streams.stream(
+ new Batcher<String, Entry<String, Integer>>(Entry::getKey, 4, 4).batch(Iter.iter(testData.iterator())));
+
+ int actualItemCount = stream.mapToInt(groupedBatch -> {
+ int r = groupedBatch.getBatch().getItems().size();
+ // Sanity check that batches are not larger than the allowed maximum size
+ Assert.assertTrue("Batch exceeded maximum size", r <= maxBatchSize);
+ return r;
+ }).sum();
+
+ Assert.assertEquals(actualItemCount, expectedItemCount);
+ }
+}