You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@jena.apache.org by an...@apache.org on 2022/09/08 12:16:56 UTC

[jena] branch main updated: GH-1517: Fixes bulk requests becoming cut off and wires up bulk query parameters with assembler

This is an automated email from the ASF dual-hosted git repository.

andy pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/jena.git


The following commit(s) were added to refs/heads/main by this push:
     new 23c6cee45f GH-1517: Fixes bulk requests becoming cut off and wires up bulk query parameters with assembler
     new 5f49905ca1 Merge pull request #1518 from Aklakan/GH-1517
23c6cee45f is described below

commit 23c6cee45f83937783deba50d0fd6565e28aa4f9
Author: Claus Stadler <Ra...@googlemail.com>
AuthorDate: Tue Sep 6 01:05:14 2022 +0200

    GH-1517: Fixes bulk requests becoming cut off and wires up bulk query parameters with assembler
---
 .../assembler/DatasetAssemblerServiceEnhancer.java |  24 +++
 .../enhancer/assembler/ServiceEnhancerVocab.java   |  24 ++-
 .../jena/sparql/service/enhancer/impl/Batcher.java | 218 +++++++++++++++++++++
 .../impl/ChainingServiceExecutorBulkCache.java     |  11 +-
 ...ChainingServiceExecutorBulkServiceEnhancer.java |   4 +-
 .../enhancer/impl/QueryIterServiceBulk.java        |  29 ++-
 .../service/enhancer/impl/RequestScheduler.java    | 212 --------------------
 .../enhancer/init/ServiceEnhancerConstants.java    |   5 +-
 .../TestDatasetAssemblerServiceEnhancer.java       |  26 ++-
 .../sparql/service/enhancer/impl/TestBatcher.java  | 141 +++++++++++++
 10 files changed, 447 insertions(+), 247 deletions(-)

diff --git a/jena-extras/jena-serviceenhancer/src/main/java/org/apache/jena/sparql/service/enhancer/assembler/DatasetAssemblerServiceEnhancer.java b/jena-extras/jena-serviceenhancer/src/main/java/org/apache/jena/sparql/service/enhancer/assembler/DatasetAssemblerServiceEnhancer.java
index 4e39f80ecf..79cb455862 100644
--- a/jena-extras/jena-serviceenhancer/src/main/java/org/apache/jena/sparql/service/enhancer/assembler/DatasetAssemblerServiceEnhancer.java
+++ b/jena-extras/jena-serviceenhancer/src/main/java/org/apache/jena/sparql/service/enhancer/assembler/DatasetAssemblerServiceEnhancer.java
@@ -20,6 +20,7 @@ package org.apache.jena.sparql.service.enhancer.assembler;
 
 import java.util.Objects;
 
+import org.apache.commons.lang3.function.TriFunction;
 import org.apache.jena.assembler.Assembler;
 import org.apache.jena.assembler.exceptions.AssemblerException;
 import org.apache.jena.atlas.logging.Log;
@@ -28,16 +29,19 @@ import org.apache.jena.graph.Node;
 import org.apache.jena.query.ARQ;
 import org.apache.jena.query.Dataset;
 import org.apache.jena.query.DatasetFactory;
+import org.apache.jena.rdf.model.Property;
 import org.apache.jena.rdf.model.RDFNode;
 import org.apache.jena.rdf.model.Resource;
 import org.apache.jena.sparql.core.DatasetGraph;
 import org.apache.jena.sparql.core.DatasetGraphWrapper;
 import org.apache.jena.sparql.core.assembler.DatasetAssembler;
+import org.apache.jena.sparql.service.enhancer.impl.ChainingServiceExecutorBulkCache;
 import org.apache.jena.sparql.service.enhancer.impl.ServiceResponseCache;
 import org.apache.jena.sparql.service.enhancer.impl.util.GraphUtilsExtra;
 import org.apache.jena.sparql.service.enhancer.init.ServiceEnhancerConstants;
 import org.apache.jena.sparql.service.enhancer.init.ServiceEnhancerInit;
 import org.apache.jena.sparql.util.Context;
+import org.apache.jena.sparql.util.Symbol;
 import org.apache.jena.sparql.util.graph.GraphUtils;
 
 /**
@@ -87,6 +91,16 @@ public class DatasetAssemblerServiceEnhancer
                 ServiceResponseCache.set(cxt, cache);
             }
 
+            // Transfer values from the RDF model to the context
+            configureCxt(root, ServiceEnhancerVocab.bulkMaxSize, cxt, ServiceEnhancerConstants.serviceBulkMaxBindingCount,
+                    false, ChainingServiceExecutorBulkCache.DFT_MAX_BULK_SIZE, GraphUtilsExtra::getAsInt);
+
+            configureCxt(root, ServiceEnhancerVocab.bulkSize, cxt, ServiceEnhancerConstants.serviceBulkBindingCount,
+                    false, ChainingServiceExecutorBulkCache.DFT_BULK_SIZE, GraphUtilsExtra::getAsInt);
+
+            configureCxt(root, ServiceEnhancerVocab.bulkMaxOutOfBandSize, cxt, ServiceEnhancerConstants.serviceBulkMaxOutOfBandBindingCount,
+                    false, ChainingServiceExecutorBulkCache.DFT_MAX_OUT_OUF_BAND_SIZE, GraphUtilsExtra::getAsInt);
+
             // If management is enabled then return a wrapped dataset with a copy of the context which has
             // mgmt enabled
             if (enableMgmt) {
@@ -103,4 +117,14 @@ public class DatasetAssemblerServiceEnhancer
 
         return result.asDatasetGraph();
     }
+
+
+    /** Transfer a resource's property value to a context symbol's value */
+    private static <T> void configureCxt(Resource root, Property property, Context cxt, Symbol symbol, boolean applyDefaultValueIfPropertyAbsent, T defaultValue, TriFunction<Resource, Property, T, T> getValue) {
+        if (root.hasProperty(property) || applyDefaultValueIfPropertyAbsent) {
+            Object value = getValue.apply(root, property, defaultValue);
+            cxt.set(symbol, value);
+        }
+
+    }
 }
diff --git a/jena-extras/jena-serviceenhancer/src/main/java/org/apache/jena/sparql/service/enhancer/assembler/ServiceEnhancerVocab.java b/jena-extras/jena-serviceenhancer/src/main/java/org/apache/jena/sparql/service/enhancer/assembler/ServiceEnhancerVocab.java
index 11444de2e6..d298e7b84b 100644
--- a/jena-extras/jena-serviceenhancer/src/main/java/org/apache/jena/sparql/service/enhancer/assembler/ServiceEnhancerVocab.java
+++ b/jena-extras/jena-serviceenhancer/src/main/java/org/apache/jena/sparql/service/enhancer/assembler/ServiceEnhancerVocab.java
@@ -34,23 +34,31 @@ public class ServiceEnhancerVocab {
     public static final Resource DatasetServiceEnhancer = ResourceFactory.createResource(NS + "DatasetServiceEnhancer");
 
     /** The id (a node) to which to resolve urn:x-arq:self */
-    public static final Property datasetId              = ResourceFactory.createProperty(NS + "datasetId");
+    public static final Property datasetId = ResourceFactory.createProperty(NS + "datasetId");
 
     /** Enable privileged management functions; creates a wrapped dataset with a copied context */
-    public static final Property enableMgmt             = ResourceFactory.createProperty(NS + "enableMgmt");
+    public static final Property enableMgmt = ResourceFactory.createProperty(NS + "enableMgmt");
 
-    // The term "baseDataset" is not officially in ja but it seems reasonable to eventually add it there
-    // (so far ja only defines baseModel)
-    public static final Property baseDataset            = ResourceFactory.createProperty(JA.getURI() + "baseDataset");
+    /** The term "baseDataset" is not officially in ja but it seems reasonable to eventually add it there.
+     * So far ja only defines baseModel */
+    public static final Property baseDataset = ResourceFactory.createProperty(JA.getURI() + "baseDataset");
 
     /** Maximum number of entries the service cache can hold */
-    public static final Property cacheMaxEntryCount = ResourceFactory.createProperty(NS + "cacheMaxEntryCount") ;
+    public static final Property cacheMaxEntryCount = ResourceFactory.createProperty(NS + "cacheMaxEntryCount");
 
     /** Number number of pages for bindings an individual cache entry can hold */
-    public static final Property cacheMaxPageCount = ResourceFactory.createProperty(NS + "cacheMaxPageCount") ;
+    public static final Property cacheMaxPageCount = ResourceFactory.createProperty(NS + "cacheMaxPageCount");
 
     /** Number of bindings a page can hold */
-    public static final Property cachePageSize = ResourceFactory.createProperty(NS + "cachePageSize") ;
+    public static final Property cachePageSize = ResourceFactory.createProperty(NS + "cachePageSize");
+
+    /** Maximum size (in terms of input bindings) of bulk requests */
+    public static final Property bulkMaxSize = ResourceFactory.createProperty(NS + "bulkMaxSize");
+
+    /** Bulk size to use if no other is set. Capped by bulkMaxSize. */
+    public static final Property bulkSize = ResourceFactory.createProperty(NS + "bulkSize");
+
+    public static final Property bulkMaxOutOfBandSize = ResourceFactory.createProperty(NS + "bulkMaxOutOfBandSize");
 
     /** Adds the following prefix declarations to the given map thereby overrides existing ones:
      * <table style="border: 1px solid;">
diff --git a/jena-extras/jena-serviceenhancer/src/main/java/org/apache/jena/sparql/service/enhancer/impl/Batcher.java b/jena-extras/jena-serviceenhancer/src/main/java/org/apache/jena/sparql/service/enhancer/impl/Batcher.java
new file mode 100644
index 0000000000..5df9a41328
--- /dev/null
+++ b/jena-extras/jena-serviceenhancer/src/main/java/org/apache/jena/sparql/service/enhancer/impl/Batcher.java
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.sparql.service.enhancer.impl;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.NavigableMap;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.TreeMap;
+import java.util.function.Function;
+
+import org.apache.jena.atlas.iterator.Iter;
+import org.apache.jena.atlas.iterator.IteratorCloseable;
+import org.apache.jena.ext.com.google.common.collect.AbstractIterator;
+
+/**
+ * The batcher transform an iterator of input items into an iterator of batches.
+ * Thereby, items are assigned incremental ids.
+ * Every returned batch will start with the lowest item index not covered by any prior batch.
+ *
+ * Parameters are:
+ * <ul>
+ * <li>maxBulkSize: The maximum size of the batches which to form</li>
+ * <li>maxOutOfBandItemCount: The maximum number of out-of-band-items when forming the next batch.
+ *       Once this threshold is reached then a batch is returned even if it wasn't full yet.</li>
+ * </ul>
+ *
+ * A batch is guaranteed to have at least one item.
+ *
+ * @param <G> group key type (e.g. type of service IRIs)
+ * @param <I> item type (e.g. Binding)
+ */
+public class Batcher<G, I> {
+    /** The maximum size of the batches formed by this class */
+    protected int maxBulkSize;
+
+    /** Allow up to this number of out-of-band-items when forming the next batch.
+     * Once this threshold is reached then a batch is returned even if it wasn't full yet.
+     */
+    protected int maxOutOfBandItemCount;
+
+    /** Function to map an input item to a group key*/
+    protected Function<I, G> itemToGroupKey;
+
+    public Batcher(Function<I, G> itemToGroupKey, int maxBulkSize, int maxOutOfBandItemCount) {
+        super();
+        this.itemToGroupKey = itemToGroupKey;
+        this.maxBulkSize = maxBulkSize;
+        this.maxOutOfBandItemCount = maxOutOfBandItemCount;
+    }
+
+    public IteratorCloseable<GroupedBatch<G, Long, I>> batch(IteratorCloseable<I> inputIterator) {
+        return new IteratorGroupedBatch(inputIterator);
+    }
+
+    class IteratorGroupedBatch
+        extends AbstractIterator<GroupedBatch<G, Long, I>> implements IteratorCloseable<GroupedBatch<G, Long, I>>
+    {
+        protected IteratorCloseable<I> inputIterator;
+
+        /** The position of the inputIterator */
+        protected long inputIteratorOffset;
+
+        // Offsets of the group keys
+        protected NavigableMap<Long, G> nextGroup = new TreeMap<>();
+
+        // The outer navigable map has to lowest offset among all the group key's related batches
+        protected Map<G, NavigableMap<Long, Batch<Long, I>>> groupToBatches = new HashMap<>();
+
+        public IteratorGroupedBatch(IteratorCloseable<I> inputIterator) {
+            this(inputIterator, 0);
+        }
+
+        public IteratorGroupedBatch(IteratorCloseable<I> inputIterator, int inputIteratorOffset) {
+            super();
+            this.inputIterator = inputIterator;
+            this.inputIteratorOffset = inputIteratorOffset;
+        }
+
+        @Override
+        protected GroupedBatch<G, Long, I> computeNext() {
+            // For the current result group key and corresponding batch determine how many out-of-band
+            // items we have already consumed from the input iterator
+            // Any item that does not contribute to the current result batch counts as out-of-band
+
+            // The key of the first pending batch - null if there is none yet
+            Optional<G> resultGroupKeyOpt = Optional.ofNullable(nextGroup.firstEntry()).map(Entry::getValue);
+            G resultGroupKey = resultGroupKeyOpt.orElse(null);
+
+            Optional<Entry<Long, Batch<Long, I>>> resultBatchEntry = resultGroupKeyOpt
+                .map(groupToBatches::get).map(offsetToBatch -> offsetToBatch.firstEntry());
+
+            long resultBatchMinOffset = resultBatchEntry.map(Entry::getKey).orElse(inputIteratorOffset);
+
+            // The result batch - null if there is none
+            Optional<Batch<Long, I>> resultBatch = resultBatchEntry.map(Entry::getValue);
+
+            // Get the highest offset of the result batch (if there is one)
+            // If a new batch has yet to be created its item ids start with inputIteratorOffset
+            long resultBatchMaxOffset = resultBatch.map(Batch::getNextValidIndex).orElse(inputIteratorOffset);
+
+            int resultBatchSize = resultBatch.map(Batch::getItems).map(Map::size).orElse(0);
+
+            long outOfBandItemCountInResultBatch = resultBatchMaxOffset - resultBatchMinOffset - resultBatchSize;
+            long outOfBandItemCountInOtherBatches = inputIteratorOffset - resultBatchMaxOffset;
+
+            // The outOfBandItemCount may be counted up in the loop below
+            long outOfBandItemCount = outOfBandItemCountInResultBatch + outOfBandItemCountInOtherBatches;
+
+            // The following four variables are re-initialized whenever the groupKey changes
+            G previousGroupKey = null; // The input iterator must never yield a null group key
+            Batch<Long, I> currentBatch = null;
+            NavigableMap<Long, Batch<Long, I>> currentBatches = null;
+            int currentBatchSize = -1;
+
+            // Only look at the input iterator if the current batch is not yet full
+            if (resultBatchSize < maxBulkSize) {
+                while (inputIterator.hasNext() && outOfBandItemCount <= maxOutOfBandItemCount) {
+                    I input = inputIterator.next();
+                    G currentGroupKey = itemToGroupKey.apply(input);
+                    Objects.requireNonNull(currentGroupKey); // Sanity check
+
+                    if (!Objects.equals(currentGroupKey, previousGroupKey)) {
+                        previousGroupKey = currentGroupKey;
+
+                        currentBatches = groupToBatches.computeIfAbsent(currentGroupKey, x -> new TreeMap<>());
+                        if (currentBatches.isEmpty()) {
+                            currentBatch = BatchImpl.forLong();
+                            currentBatches.put(inputIteratorOffset, currentBatch);
+                            nextGroup.put(inputIteratorOffset, currentGroupKey);
+                        } else {
+                            currentBatch = currentBatches.lastEntry().getValue();
+                        }
+                        currentBatchSize = currentBatch.size();
+
+                        if (resultGroupKey == null) {
+                            resultGroupKey = currentGroupKey;
+                        }
+                    }
+
+                    // Add the item to the current batch if it is not full
+                    // Otherwise start a new batch
+                    Batch<Long, I> insertTargetBatch;
+                    if (currentBatchSize >= maxBulkSize) {
+                        insertTargetBatch = BatchImpl.forLong();
+                        currentBatches.put(inputIteratorOffset, insertTargetBatch);
+                        nextGroup.put(inputIteratorOffset, currentGroupKey);
+                    } else {
+                        insertTargetBatch = currentBatch;
+                        // We are just about to add an item to the current batch
+                        ++currentBatchSize;
+                    }
+                    insertTargetBatch.put(inputIteratorOffset, input);
+                    ++inputIteratorOffset;
+
+                    // If we just completely filled up the result batch then break
+                    if (currentGroupKey.equals(resultGroupKey)) {
+                        if (currentBatchSize >= maxBulkSize) {
+                            break;
+                        }
+                    } else {
+                        ++outOfBandItemCount;
+                    }
+
+                    // Update the state if we started a new batch (because the current one was full)
+                    if (insertTargetBatch != currentBatch) {
+                        currentBatch = insertTargetBatch;
+                        currentBatchSize = insertTargetBatch.size();
+                    }
+                }
+            }
+
+            // Return and remove the first batch from our data structures
+            GroupedBatch<G, Long, I> result;
+            Iterator<Entry<Long, G>> nextGroupIt = nextGroup.entrySet().iterator();
+            if (nextGroupIt.hasNext()) {
+                Entry<Long, G> e = nextGroupIt.next();
+                resultGroupKey = e.getValue();
+                nextGroupIt.remove();
+                // nextInputId = e.getKey();
+
+                NavigableMap<Long, Batch<Long, I>> nextBatches = groupToBatches.get(resultGroupKey);
+                Iterator<Batch<Long, I>> nextBatchesIt = nextBatches.values().iterator();
+                Batch<Long, I> resultBatchTmp = nextBatchesIt.next();
+                nextBatchesIt.remove();
+
+                result = new GroupedBatchImpl<>(resultGroupKey, resultBatchTmp);
+            } else {
+                result = endOfData();
+            }
+            return result;
+        }
+
+        @Override
+        public void close() {
+            Iter.close(inputIterator);
+        }
+    }
+}
diff --git a/jena-extras/jena-serviceenhancer/src/main/java/org/apache/jena/sparql/service/enhancer/impl/ChainingServiceExecutorBulkCache.java b/jena-extras/jena-serviceenhancer/src/main/java/org/apache/jena/sparql/service/enhancer/impl/ChainingServiceExecutorBulkCache.java
index 7c385cb678..b263c0aa09 100644
--- a/jena-extras/jena-serviceenhancer/src/main/java/org/apache/jena/sparql/service/enhancer/impl/ChainingServiceExecutorBulkCache.java
+++ b/jena-extras/jena-serviceenhancer/src/main/java/org/apache/jena/sparql/service/enhancer/impl/ChainingServiceExecutorBulkCache.java
@@ -36,9 +36,9 @@ import org.apache.jena.sparql.util.Context;
 public class ChainingServiceExecutorBulkCache
     implements ChainingServiceExecutorBulk {
 
-    public static final int DEFAULT_BULK_SIZE = 30;
-    public static final int MAX_BULK_SIZE = 100;
-    public static final int DEFAULT_MAX_BYTE_SIZE = 5000;
+    public static final int DFT_BULK_SIZE = 30;
+    public static final int DFT_MAX_BULK_SIZE = 100;
+    public static final int DFT_MAX_OUT_OUF_BAND_SIZE = 30;
 
     protected int bulkSize;
     protected CacheMode cacheMode;
@@ -67,8 +67,9 @@ public class ChainingServiceExecutorBulkCache
 
         OpServiceExecutorImpl opExecutor = new OpServiceExecutorImpl(serviceInfo.getOpService(), execCxt, chain);
 
-        RequestScheduler<Node, Binding> scheduler = new RequestScheduler<>(serviceInfo::getSubstServiceNode, bulkSize);
-        IteratorCloseable<GroupedBatch<Node, Long, Binding>> inputBatchIterator = scheduler.group(input);
+        int maxOutOfBandItemCount = cxt.getInt(ServiceEnhancerConstants.serviceBulkMaxOutOfBandBindingCount, DFT_MAX_OUT_OUF_BAND_SIZE);
+        Batcher<Node, Binding> scheduler = new Batcher<>(serviceInfo::getSubstServiceNode, bulkSize, maxOutOfBandItemCount);
+        IteratorCloseable<GroupedBatch<Node, Long, Binding>> inputBatchIterator = scheduler.batch(input);
 
         RequestExecutor exec = new RequestExecutor(opExecutor, serviceInfo, resultSizeCache, serviceCache, cacheMode, inputBatchIterator);
 
diff --git a/jena-extras/jena-serviceenhancer/src/main/java/org/apache/jena/sparql/service/enhancer/impl/ChainingServiceExecutorBulkServiceEnhancer.java b/jena-extras/jena-serviceenhancer/src/main/java/org/apache/jena/sparql/service/enhancer/impl/ChainingServiceExecutorBulkServiceEnhancer.java
index 4db5816e67..f3c1e91a54 100644
--- a/jena-extras/jena-serviceenhancer/src/main/java/org/apache/jena/sparql/service/enhancer/impl/ChainingServiceExecutorBulkServiceEnhancer.java
+++ b/jena-extras/jena-serviceenhancer/src/main/java/org/apache/jena/sparql/service/enhancer/impl/ChainingServiceExecutorBulkServiceEnhancer.java
@@ -76,8 +76,8 @@ public class ChainingServiceExecutorBulkServiceEnhancer
             case ServiceOpts.SO_BULK: // Enables bulk requests
                 enableBulk = true;
 
-                int maxBulkSize = cxt.get(ServiceEnhancerConstants.serviceBulkMaxBindingCount, ChainingServiceExecutorBulkCache.MAX_BULK_SIZE);
-                bulkSize = cxt.get(ServiceEnhancerConstants.serviceBulkBindingCount, ChainingServiceExecutorBulkCache.DEFAULT_BULK_SIZE);
+                int maxBulkSize = cxt.get(ServiceEnhancerConstants.serviceBulkMaxBindingCount, ChainingServiceExecutorBulkCache.DFT_MAX_BULK_SIZE);
+                bulkSize = cxt.get(ServiceEnhancerConstants.serviceBulkBindingCount, ChainingServiceExecutorBulkCache.DFT_BULK_SIZE);
                 try {
                     if (val == null || val.isBlank()) {
                         // Ignored
diff --git a/jena-extras/jena-serviceenhancer/src/main/java/org/apache/jena/sparql/service/enhancer/impl/QueryIterServiceBulk.java b/jena-extras/jena-serviceenhancer/src/main/java/org/apache/jena/sparql/service/enhancer/impl/QueryIterServiceBulk.java
index 695236aeb2..3df6bec0df 100644
--- a/jena-extras/jena-serviceenhancer/src/main/java/org/apache/jena/sparql/service/enhancer/impl/QueryIterServiceBulk.java
+++ b/jena-extras/jena-serviceenhancer/src/main/java/org/apache/jena/sparql/service/enhancer/impl/QueryIterServiceBulk.java
@@ -33,7 +33,6 @@ import org.apache.jena.atlas.iterator.Iter;
 import org.apache.jena.atlas.iterator.IteratorCloseable;
 import org.apache.jena.atlas.iterator.IteratorOnClose;
 import org.apache.jena.atlas.lib.Closeable;
-import org.apache.jena.atlas.logging.Log;
 import org.apache.jena.ext.com.google.common.collect.Iterators;
 import org.apache.jena.ext.com.google.common.collect.Range;
 import org.apache.jena.ext.com.google.common.collect.RangeMap;
@@ -71,6 +70,8 @@ import org.apache.jena.sparql.service.enhancer.slice.api.ReadableChannelWithLimi
 import org.apache.jena.sparql.service.enhancer.slice.api.Slice;
 import org.apache.jena.sparql.service.enhancer.slice.api.SliceAccessor;
 import org.apache.jena.sparql.util.NodeFactoryExtra;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * QueryIter to process service requests in bulk with support for streaming caching.
@@ -81,6 +82,8 @@ import org.apache.jena.sparql.util.NodeFactoryExtra;
 public class QueryIterServiceBulk
     extends QueryIterSlottedBase
 {
+    private static final Logger logger = LoggerFactory.getLogger(QueryIterServiceBulk.class);
+
     protected OpServiceInfo serviceInfo;
     protected ServiceCacheKeyFactory cacheKeyFactory;
 
@@ -196,11 +199,11 @@ public class QueryIterServiceBulk
             boolean isBackendIt = sliceKeysForBackend.contains(partKey);
 
             if (isBackendIt && !activeIt.hasNext()) {
-                Log.debug(QueryIterServiceBulk.class, "Iterator ended without end marker - assuming remote result set limit reached");
+                logger.debug("Iterator ended without end marker - assuming remote result set limit reached");
                 long seenBackendData = backendIt.getOffset();
                 backendResultSetLimit = new Estimate<>(seenBackendData, true);
                 if (seenBackendData <= 0) {
-                    Log.warn(QueryIterServiceBulk.class, "Known result set limit of " + seenBackendData + " detected");
+                    logger.warn("Known result set limit of " + seenBackendData + " detected");
                 }
 
                 resultSizeCache.updateLimit(targetService, backendResultSetLimit);
@@ -470,7 +473,10 @@ public class QueryIterServiceBulk
         int nextAllocOutputId = 0;
         int batchSize = inputs.size();
 
-        Log.info(QueryIterServiceBulk.class, "Schedule for current batch:");
+        if (logger.isInfoEnabled()) {
+            logger.info("Schedule for current batch:");
+        }
+
         int rangeId = currentRangeId;
 
         for (int inputId = currentInputId; inputId < batchSize; ++inputId) {
@@ -508,7 +514,9 @@ public class QueryIterServiceBulk
 
                 lock = slice.getReadWriteLock().readLock();
 
-                Log.debug(QueryIterServiceBulk.class, "Created cache key: " + cacheKey);
+                if (logger.isDebugEnabled()) {
+                    logger.debug("Created cache key: " + cacheKey);
+                }
                 // Log.debug(BatchRequestIterator.class, "Cached ranges: " + slice.getLoadedRanges().toString());
 
                 lock.lock();
@@ -577,11 +585,12 @@ public class QueryIterServiceBulk
                 //   - We need to start the backend request from the request offset
                 //   - The issue is how to handle the next binding
 
-                Log.info(QueryIterServiceBulk.class, "input " + inputId + ": " +
-                    allRanges.toString()
-                        .replace("false", "fetch")
-                        .replace("true", "cached"));
-
+                if (logger.isInfoEnabled()) {
+                    logger.info("input " + inputId + ": " +
+                        allRanges.toString()
+                            .replace("false", "fetch")
+                            .replace("true", "cached"));
+                }
                 Map<Range<Long>, Boolean> mapOfRanges = allRanges.asMapOfRanges();
 
                 if (mapOfRanges.isEmpty()) {
diff --git a/jena-extras/jena-serviceenhancer/src/main/java/org/apache/jena/sparql/service/enhancer/impl/RequestScheduler.java b/jena-extras/jena-serviceenhancer/src/main/java/org/apache/jena/sparql/service/enhancer/impl/RequestScheduler.java
deleted file mode 100644
index c09a06a1f0..0000000000
--- a/jena-extras/jena-serviceenhancer/src/main/java/org/apache/jena/sparql/service/enhancer/impl/RequestScheduler.java
+++ /dev/null
@@ -1,212 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.jena.sparql.service.enhancer.impl;
-
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.NavigableMap;
-import java.util.Objects;
-import java.util.Optional;
-import java.util.TreeMap;
-import java.util.function.Function;
-
-import org.apache.jena.atlas.iterator.Iter;
-import org.apache.jena.atlas.iterator.IteratorCloseable;
-import org.apache.jena.ext.com.google.common.collect.AbstractIterator;
-
-/**
- * Accumulates items from an input iterator into batches.
- * Every returned batch will start with the first item index not covered by any prior batch.
- *
- * Parameters are:
- * <ul>
- * <li>maxBulkSize: The maximum number of items allowed in a batch returned by a call to next()</li>
- * <li>maxeReadAhead: The maximum number of items allowed to read from the input iterator in order to fill a batch</li>
- * <li>maxInputDistance: The index of items w.r.t. to the input iterator in a batch must not be farther apart than this distance</li>
- * </ul>
- *
- * A batch is guaranteed to have at least one item.
- * If any of the thresholds is exceeded a batch will have fewer items that its maximum allowed size.
- *
- *
- * @param <G> group key type (e.g. service IRI)
- * @param <I> item type (e.g. Binding)
- */
-public class RequestScheduler<G, I> {
-    protected int maxBulkSize;
-
-    /** Allow reading at most this number of items ahead for the input iterator to completely fill
-     *  the batch request for the next response */
-    protected int maxReadAhead = 300;
-
-    /** Do not group inputs into the same batch if their ids are this (or more) of that amount apart */
-    protected int maxInputDistance = 50;
-
-    // protected Iterator<I> inputIterator;
-    protected Function<I, G> inputToGroup;
-
-    public RequestScheduler(Function<I, G> inputToGroup, int maxBulkSize) {
-        super();
-        this.inputToGroup = inputToGroup;
-        this.maxBulkSize = maxBulkSize;
-    }
-
-    public IteratorCloseable<GroupedBatch<G, Long, I>> group(IteratorCloseable<I> inputIterator) {
-        return new Grouper(inputIterator);
-    }
-
-    class Grouper
-        extends AbstractIterator<GroupedBatch<G, Long, I>> implements IteratorCloseable<GroupedBatch<G, Long, I>>
-    {
-        protected IteratorCloseable<I> inputIterator;
-
-        /** The position of the inputIterator */
-        protected long inputIteratorOffset;
-
-        /** The offset of the next item being returned */
-        protected long nextResultOffset;
-
-        protected long nextInputId;
-
-        // the outer navigable map has to lowest offset of the batch
-        protected Map<G, NavigableMap<Long, Batch<Long, I>>> groupToBatches = new HashMap<>();
-
-        // Offsets of the group keys
-        protected NavigableMap<Long, G> nextGroup = new TreeMap<>();
-
-        public Grouper(IteratorCloseable<I> inputIterator) {
-            this(inputIterator, 0);
-        }
-
-        public Grouper(IteratorCloseable<I> inputIterator, int inputIteratorOffset) {
-            super();
-            this.inputIterator = inputIterator;
-            this.inputIteratorOffset = inputIteratorOffset;
-            this.nextResultOffset = inputIteratorOffset;
-        }
-
-        @Override
-        protected GroupedBatch<G, Long, I> computeNext() {
-            G resultGroupKey = Optional.ofNullable(nextGroup.firstEntry()).map(Entry::getValue).orElse(null);
-            G lastGroupKey = null;
-
-            // Cached references
-            NavigableMap<Long, Batch<Long, I>> batches = null;
-            Batch<Long, I> batch = null;
-
-            while (inputIterator.hasNext() && inputIteratorOffset - nextResultOffset < maxReadAhead) {
-                I input = inputIterator.next();
-                G groupKey = inputToGroup.apply(input);
-
-                if (!Objects.equals(groupKey, lastGroupKey)) {
-                    lastGroupKey = groupKey;
-
-                    if (resultGroupKey == null) {
-                        resultGroupKey = groupKey;
-                    }
-
-                    batches = groupToBatches.computeIfAbsent(groupKey, x -> new TreeMap<>());
-                    if (batches.isEmpty()) {
-                        batch = BatchImpl.forLong();
-                        batches.put(inputIteratorOffset, batch);
-                        nextGroup.put(inputIteratorOffset, groupKey);
-                    } else {
-                        batch = batches.lastEntry().getValue();
-                    }
-                }
-
-                // Check whether we need to start a new request
-                // Either because the batch is full or the differences between the input ids is too great
-                long batchEndOffset = batch.getNextValidIndex();
-                long distance = nextInputId - batchEndOffset;
-
-                // If the item is consecutive add it to the list
-                int batchSize = batch.size();
-                if (distance > maxInputDistance || batchSize >= maxBulkSize) {
-                    batch = BatchImpl.forLong();
-                    batches.put(inputIteratorOffset, batch);
-                }
-                batch.put(inputIteratorOffset, input);
-                ++inputIteratorOffset;
-
-                // If the batch of the result group just became full then break
-                if (groupKey.equals(resultGroupKey) && batchSize + 1 >= maxBulkSize) {
-                    break;
-                }
-            }
-
-            // Return and remove the first batch from our data structures
-
-            GroupedBatch<G, Long, I> result;
-            Iterator<Entry<Long, G>> nextGroupIt = nextGroup.entrySet().iterator();
-            if (nextGroupIt.hasNext()) {
-                Entry<Long, G> e = nextGroupIt.next();
-                resultGroupKey = e.getValue();
-                nextGroupIt.remove();
-                nextInputId = e.getKey();
-
-                NavigableMap<Long, Batch<Long, I>> nextBatches = groupToBatches.get(resultGroupKey);
-                Iterator<Batch<Long, I>> nextBatchesIt = nextBatches.values().iterator();
-                Batch<Long, I> resultBatch = nextBatchesIt.next();
-                nextBatchesIt.remove();
-
-                result = new GroupedBatchImpl<>(resultGroupKey, resultBatch);
-            } else {
-                result = endOfData();
-            }
-            return result;
-        }
-
-        @Override
-        public void close() {
-            Iter.close(inputIterator);
-        }
-    }
-
-//
-//	public static void main(String[] args) {
-//		Var v = Var.alloc("v");
-//		Iterator<Binding> individualIt = IntStream.range(0, 10)
-//				.mapToObj(x -> BindingFactory.binding(v, NodeValue.makeInteger(x).asNode()))
-//				.iterator();
-//
-//		Op op = Algebra.compile(QueryFactory.create("SELECT * { ?v ?p ?o }"));
-//		OpService opService = new OpService(v, op, false);
-//		OpServiceInfo serviceInfo = new OpServiceInfo(opService);
-//
-//
-//		RequestScheduler<Node, Binding> scheduler = new RequestScheduler<>(b ->
-//			NodeFactory.createLiteral("group" + (NodeValue.makeNode(b.get(v)).getInteger().intValue() % 3)), 2);
-//		Iterator<ServiceBatchRequest<Node, Binding>> batchIt = scheduler.group(individualIt);
-//
-//		OpServiceExecutorImpl opExecutor = null;
-//
-//		RequestExecutor executor = new RequestExecutor(opExecutor, serviceInfo, batchIt);
-//		// executor.exec();
-//
-////		while (batchIt.hasNext()) {
-////			System.out.println(batchIt.next());
-////		}
-//
-//
-//	}
-//
-}
diff --git a/jena-extras/jena-serviceenhancer/src/main/java/org/apache/jena/sparql/service/enhancer/init/ServiceEnhancerConstants.java b/jena-extras/jena-serviceenhancer/src/main/java/org/apache/jena/sparql/service/enhancer/init/ServiceEnhancerConstants.java
index 76d3f5a69d..5c08373ead 100644
--- a/jena-extras/jena-serviceenhancer/src/main/java/org/apache/jena/sparql/service/enhancer/init/ServiceEnhancerConstants.java
+++ b/jena-extras/jena-serviceenhancer/src/main/java/org/apache/jena/sparql/service/enhancer/init/ServiceEnhancerConstants.java
@@ -36,8 +36,11 @@ public class ServiceEnhancerConstants {
     /** Maximum number of bindings to group into a single bulk request; restricts serviceBulkRequestItemCount */
     public static final Symbol serviceBulkMaxBindingCount = SystemARQ.allocSymbol(NS, "serviceBulkMaxBindingCount") ;
 
+    /** Maximum number of out-of-band bindings that can be skipped over when forming an individual bulk request */
+    public static final Symbol serviceBulkMaxOutOfBandBindingCount = SystemARQ.allocSymbol(NS, "serviceBulkMaxOutOfBandBindingCount") ;
+
     /** Number of bindings to group into a single bulk request */
-    public static final Symbol serviceBulkBindingCount = SystemARQ.allocSymbol(NS, "serviceBulkMaxBindingCount") ;
+    public static final Symbol serviceBulkBindingCount = SystemARQ.allocSymbol(NS, "serviceBulkBindingCount") ;
 
     /** Symbol for the cache of services' result sets */
     public static final Symbol serviceCache = SystemARQ.allocSymbol(NS, "serviceCache") ;
diff --git a/jena-extras/jena-serviceenhancer/src/test/java/org/apache/jena/sparql/service/enhancer/assembler/TestDatasetAssemblerServiceEnhancer.java b/jena-extras/jena-serviceenhancer/src/test/java/org/apache/jena/sparql/service/enhancer/assembler/TestDatasetAssemblerServiceEnhancer.java
index 321d4475a9..e9aea003c1 100644
--- a/jena-extras/jena-serviceenhancer/src/test/java/org/apache/jena/sparql/service/enhancer/assembler/TestDatasetAssemblerServiceEnhancer.java
+++ b/jena-extras/jena-serviceenhancer/src/test/java/org/apache/jena/sparql/service/enhancer/assembler/TestDatasetAssemblerServiceEnhancer.java
@@ -30,31 +30,39 @@ import org.apache.jena.rdf.model.Model;
 import org.apache.jena.rdf.model.ModelFactory;
 import org.apache.jena.riot.Lang;
 import org.apache.jena.riot.RDFDataMgr;
+import org.apache.jena.sparql.service.enhancer.init.ServiceEnhancerConstants;
+import org.apache.jena.sparql.util.Context;
 import org.apache.jena.vocabulary.RDF;
 import org.junit.Assert;
 import org.junit.Test;
 
 public class TestDatasetAssemblerServiceEnhancer
 {
-    /**
-     * This test case attempts to assemble a dataset with the service enhancer plugin
-     * set up in its context. A query making use of enhancer features is fired against it.
-     * Only if the plugin is loaded successfully then the query will execute successfully.
-     */
-    @Test
-    public void testAssembler() {
-        String specStr = String.join("\n",
+    private static final String SPEC_STR_01 = String.join("\n",
             "PREFIX ja: <http://jena.hpl.hp.com/2005/11/Assembler#>",
             "PREFIX se: <http://jena.apache.org/service-enhancer#>",
             "<urn:example:root> a se:DatasetServiceEnhancer ; ja:baseDataset <urn:example:base> .",
             "<urn:example:root> se:cacheMaxEntryCount 5 ; se:cachePageSize 1000 ; se:cacheMaxPageCount 10 .",
+            "<urn:example:root> se:bulkMaxSize 20 ; se:bulkSize 10 ; se:bulkMaxOutOfBandSize 5 .",
             "<urn:example:base> a ja:MemoryDataset ."
         );
 
+    /**
+     * This test case attempts to assemble a dataset with the service enhancer plugin
+     * set up in its context. A query making use of enhancer features is fired against it.
+     * Only if the plugin is loaded successfully then the query will execute successfully.
+     */
+    @Test
+    public void testAssembler() {
         Model spec = ModelFactory.createDefaultModel();
-        RDFDataMgr.read(spec, new StringReader(specStr), null, Lang.TURTLE);
+        RDFDataMgr.read(spec, new StringReader(SPEC_STR_01), null, Lang.TURTLE);
 
         Dataset dataset = DatasetFactory.assemble(spec.getResource("urn:example:root"));
+        Context cxt = dataset.getContext();
+
+        Assert.assertEquals(20, cxt.getInt(ServiceEnhancerConstants.serviceBulkMaxBindingCount, -1));
+        Assert.assertEquals(10, cxt.getInt(ServiceEnhancerConstants.serviceBulkBindingCount, -1));
+        Assert.assertEquals(5, cxt.getInt(ServiceEnhancerConstants.serviceBulkMaxOutOfBandBindingCount, -1));
 
         try (QueryExecution qe = QueryExecutionFactory.create(
                 "SELECT * { BIND(<urn:example:x> AS ?x) SERVICE <loop:bulk+10:> { ?x ?y ?z } }", dataset)) {
diff --git a/jena-extras/jena-serviceenhancer/src/test/java/org/apache/jena/sparql/service/enhancer/impl/TestBatcher.java b/jena-extras/jena-serviceenhancer/src/test/java/org/apache/jena/sparql/service/enhancer/impl/TestBatcher.java
new file mode 100644
index 0000000000..e5e53dd60d
--- /dev/null
+++ b/jena-extras/jena-serviceenhancer/src/test/java/org/apache/jena/sparql/service/enhancer/impl/TestBatcher.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.sparql.service.enhancer.impl;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Random;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import org.apache.jena.atlas.iterator.Iter;
+import org.apache.jena.atlas.iterator.IteratorCloseable;
+import org.apache.jena.ext.com.google.common.collect.Streams;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestBatcher {
+
+    private static final List<Entry<String, Integer>> testData01 = List.<Entry<String, Integer>>of(
+            Map.entry("a", 0),
+            Map.entry("a", 1),
+            Map.entry("b", 2),
+            Map.entry("b", 3),
+            Map.entry("b", 4),
+            Map.entry("a", 5),
+            Map.entry("c", 6),
+            Map.entry("a", 7),
+            Map.entry("c", 8),
+            Map.entry("c", 9),
+            Map.entry("d", 10),
+            Map.entry("d", 11),
+            Map.entry("c", 12),
+            Map.entry("c", 13),
+            Map.entry("d", 14),
+            Map.entry("e", 15));
+
+    /** Test that grouping items correctly considers maxBatchSize and maxOutOfBindItemCount
+     * Batch size = 3; up to two out-of-band items are allowed to form a batch.
+     */
+    @Test
+    public void testBatcher_01_3_2() {
+        // The numbers below refer to the value-component of the test data
+        List<List<Integer>> expectedBatchIds = List.<List<Integer>>of(
+                List.of(0, 1),
+                List.of(2, 3, 4),
+                List.of(5, 7),
+                List.of(6, 8, 9),
+                List.of(10, 11, 14),
+                List.of(12, 13),
+                List.of(15)
+        );
+        eval(testData01, 3, 2, expectedBatchIds);
+    }
+
+    /** Form batches up to size 3 allowing 0 out-of-band items; every change in the group id starts a new batch */
+    @Test
+    public void testBatcher_01_3_0() {
+        // The numbers below refer to the value-component of the test data
+        List<List<Integer>> expectedBatchIds = List.<List<Integer>>of(
+                List.of(0, 1),
+                List.of(2, 3, 4),
+                List.of(5),
+                List.of(6),
+                List.of(7),
+                List.of(8, 9),
+                List.of(10, 11),
+                List.of(12, 13),
+                List.of(14),
+                List.of(15)
+        );
+        eval(testData01, 3, 0, expectedBatchIds);
+    }
+
+    public static void eval(
+            List<Entry<String, Integer>> input,
+            int maxBatchSize,
+            int maxOutOfBandItemCount,
+            List<List<Integer>> expectedBatchIds) {
+        IteratorCloseable<GroupedBatch<String, Long, Entry<String, Integer>>> it = new Batcher<String, Entry<String, Integer>>
+            (Entry::getKey, maxBatchSize, maxOutOfBandItemCount).batch(Iter.iter(input.iterator()));
+        // it.forEachRemaining(System.err::println);
+
+        // For each obtained batch extract the list of values
+        List<List<Integer>> actualBatchIds = Streams.stream(it)
+                .map(groupedBatch -> groupedBatch.getBatch().getItems().values().stream().map(Entry::getValue).collect(Collectors.toList()))
+                .collect(Collectors.toList());
+
+        Assert.assertEquals(expectedBatchIds, actualBatchIds);
+    }
+
+    /** This test creates random input, batches it and checks that the number of batched items matches that of the input */
+    @Test
+    public void testBatcher_largeInput() {
+        int expectedItemCount = 10000;
+        int maxBatchSize = 10;
+        int maxGroupCount = 10;
+        Random rand = new Random(0);
+
+        List<Entry<String, Integer>> testData = new ArrayList<>(expectedItemCount);
+        int capacity = expectedItemCount;
+        int i = 0;
+        while (capacity > 0) {
+            String groupName = Character.toString((char)('a' + rand.nextInt(maxGroupCount)));
+            int batchSize = Math.min(capacity, 1 + rand.nextInt(maxBatchSize - 1));
+            for (int j = 0; j < batchSize; ++j) {
+                testData.add(Map.entry(groupName, i++));
+            }
+            capacity -= batchSize;
+        }
+
+        Stream<GroupedBatch<String, Long, Entry<String, Integer>>> stream = Streams.stream(
+                new Batcher<String, Entry<String, Integer>>(Entry::getKey, 4, 4).batch(Iter.iter(testData.iterator())));
+
+        int actualItemCount = stream.mapToInt(groupedBatch -> {
+            int r = groupedBatch.getBatch().getItems().size();
+            // Sanity check that batches are not larger than the allowed maximum size
+            Assert.assertTrue("Batch exceeded maximum size", r <= maxBatchSize);
+            return r;
+        }).sum();
+
+        Assert.assertEquals(actualItemCount, expectedItemCount);
+    }
+}