You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by re...@apache.org on 2017/03/24 15:37:24 UTC

svn commit: r1788476 - in /jackrabbit/oak/trunk/oak-core/src: main/java/org/apache/jackrabbit/oak/plugins/document/rdb/ test/java/org/apache/jackrabbit/oak/plugins/document/rdb/

Author: reschke
Date: Fri Mar 24 15:37:24 2017
New Revision: 1788476

URL: http://svn.apache.org/viewvc?rev=1788476&view=rev
Log:
OAK-5855: RDBDocumentStore: improve query support for VersionGC

- APIs to query for Iterable<Document>
- test coverage
- use new API in RDBVersionGCSupport

Modified:
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStoreJDBC.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBVersionGCSupport.java
    jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStoreJDBCTest.java
    jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStoreTest.java

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java?rev=1788476&r1=1788475&r2=1788476&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java Fri Mar 24 15:37:24 2017
@@ -25,6 +25,7 @@ import static org.apache.jackrabbit.oak.
 import static org.apache.jackrabbit.oak.plugins.document.rdb.RDBJDBCTools.createTableName;
 
 import java.io.ByteArrayOutputStream;
+import java.io.Closeable;
 import java.io.IOException;
 import java.io.UnsupportedEncodingException;
 import java.sql.Connection;
@@ -61,8 +62,6 @@ import javax.annotation.CheckForNull;
 import javax.annotation.Nonnull;
 import javax.sql.DataSource;
 
-import com.google.common.base.Function;
-import com.google.common.base.Stopwatch;
 import org.apache.jackrabbit.oak.cache.CacheStats;
 import org.apache.jackrabbit.oak.cache.CacheValue;
 import org.apache.jackrabbit.oak.plugins.document.Collection;
@@ -83,11 +82,15 @@ import org.apache.jackrabbit.oak.plugins
 import org.apache.jackrabbit.oak.plugins.document.locks.NodeDocumentLocks;
 import org.apache.jackrabbit.oak.plugins.document.locks.StripedNodeDocumentLocks;
 import org.apache.jackrabbit.oak.plugins.document.mongo.MongoDocumentStore;
+import org.apache.jackrabbit.oak.plugins.document.util.CloseableIterator;
 import org.apache.jackrabbit.oak.util.OakVersion;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Function;
+import com.google.common.base.Stopwatch;
 import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterators;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
@@ -1485,6 +1488,58 @@ public class RDBDocumentStore implements
         }
     }
 
+    private static interface MyCloseableIterable<T> extends Closeable, Iterable<T> {
+    }
+
+    protected <T extends Document> Iterable<T> queryAsIterable(final Collection<T> collection, String fromKey, String toKey,
+            final List<String> excludeKeyPatterns, final List<QueryCondition> conditions, final int limit, final String sortBy) {
+
+        final RDBTableMetaData tmd = getTable(collection);
+        for (QueryCondition cond : conditions) {
+            if (!INDEXEDPROPERTIES.contains(cond.getPropertyName())) {
+                String message = "indexed property " + cond.getPropertyName() + " not supported, query was '" + cond.getOperator()
+                        + "'" + cond.getValue() + "'; supported properties are " + INDEXEDPROPERTIES;
+                LOG.info(message);
+                throw new DocumentStoreException(message);
+            }
+        }
+
+        final String from = collection == Collection.NODES && NodeDocument.MIN_ID_VALUE.equals(fromKey) ? null : fromKey;
+        final String to = collection == Collection.NODES && NodeDocument.MAX_ID_VALUE.equals(toKey) ? null : toKey;
+
+        return new MyCloseableIterable<T>() {
+
+            Set<Iterator<RDBRow>> returned = Sets.newHashSet();
+
+            @Override
+            public Iterator<T> iterator() {
+                try {
+                    Iterator<RDBRow> res = db.queryAsIterator(ch, tmd, from, to, excludeKeyPatterns, conditions,
+                            limit, sortBy);
+                    returned.add(res);
+                    Iterator<T> tmp = Iterators.transform(res, new Function<RDBRow, T>() {
+                        @Override
+                        public T apply(RDBRow input) {
+                            return convertFromDBObject(collection, input);
+                        }
+                    });
+                    return CloseableIterator.wrap(tmp, (Closeable) res);
+                } catch (SQLException ex) {
+                    throw new RuntimeException(ex);
+                }
+            }
+
+            @Override
+            public void close() throws IOException {
+                for (Iterator<RDBRow> rdbi : returned) {
+                    if (rdbi instanceof Closeable) {
+                        ((Closeable) rdbi).close();
+                    }
+                }
+            }
+        };
+    }
+
     @Nonnull
     protected <T extends Document> RDBTableMetaData getTable(Collection<T> collection) {
         RDBTableMetaData tmd = this.tableMeta.get(collection);

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStoreJDBC.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStoreJDBC.java?rev=1788476&r1=1788475&r2=1788476&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStoreJDBC.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStoreJDBC.java Fri Mar 24 15:37:24 2017
@@ -23,6 +23,8 @@ import static org.apache.jackrabbit.oak.
 import static org.apache.jackrabbit.oak.plugins.document.rdb.RDBJDBCTools.closeResultSet;
 import static org.apache.jackrabbit.oak.plugins.document.rdb.RDBJDBCTools.closeStatement;
 
+import java.io.Closeable;
+import java.io.IOException;
 import java.io.UnsupportedEncodingException;
 import java.sql.BatchUpdateException;
 import java.sql.Connection;
@@ -37,9 +39,11 @@ import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.NoSuchElementException;
 import java.util.Set;
 
 import javax.annotation.CheckForNull;
@@ -449,56 +453,13 @@ public class RDBDocumentStoreJDBC {
     public List<RDBRow> query(Connection connection, RDBTableMetaData tmd, String minId, String maxId,
             List<String> excludeKeyPatterns, List<QueryCondition> conditions, int limit) throws SQLException {
         long start = System.currentTimeMillis();
-        StringBuilder selectClause = new StringBuilder();
-        if (limit != Integer.MAX_VALUE && this.dbInfo.getFetchFirstSyntax() == FETCHFIRSTSYNTAX.TOP) {
-            selectClause.append("TOP " + limit + " ");
-        }
-        selectClause.append("ID, MODIFIED, MODCOUNT, CMODCOUNT, HASBINARY, DELETEDONCE, DATA, BDATA from ").append(tmd.getName());
-
-        String whereClause = buildWhereClause(minId, maxId, excludeKeyPatterns, conditions);
-
-        StringBuilder query = new StringBuilder();
-        query.append("select ").append(selectClause);
-        if (whereClause.length() != 0) {
-            query.append(" where ").append(whereClause);
-        }
-
-        query.append(" order by ID");
-
-        if (limit != Integer.MAX_VALUE) {
-            switch (this.dbInfo.getFetchFirstSyntax()) {
-                case LIMIT:
-                    query.append(" LIMIT " + limit);
-                    break;
-                case FETCHFIRST:
-                    query.append(" FETCH FIRST " + limit + " ROWS ONLY");
-                    break;
-                default:
-                    break;
-            }
-        }
-
-        PreparedStatement stmt = connection.prepareStatement(query.toString());
-        ResultSet rs = null;
         List<RDBRow> result = new ArrayList<RDBRow>();
         long dataTotal = 0, bdataTotal = 0;
+        PreparedStatement stmt = null;
+        ResultSet rs = null;
         try {
-            int si = 1;
-            if (minId != null) {
-                setIdInStatement(tmd, stmt, si++, minId);
-            }
-            if (maxId != null) {
-                setIdInStatement(tmd, stmt, si++, maxId);
-            }
-            for (String keyPattern : excludeKeyPatterns) {
-                setIdInStatement(tmd, stmt, si++, keyPattern);
-            }
-            for (QueryCondition cond : conditions) {
-                stmt.setLong(si++, cond.getValue());
-            }
-            if (limit != Integer.MAX_VALUE) {
-                stmt.setFetchSize(limit);
-            }
+            stmt = prepareQuery(connection, tmd, "ID, MODIFIED, MODCOUNT, CMODCOUNT, HASBINARY, DELETEDONCE, DATA, BDATA", minId,
+                    maxId, excludeKeyPatterns, conditions, limit, "ID");
             rs = stmt.executeQuery();
             while (rs.next() && result.size() < limit) {
                 String id = getIdFromRS(tmd, rs, 1);
@@ -519,8 +480,8 @@ public class RDBDocumentStoreJDBC {
                 bdataTotal += bdata == null ? 0 : bdata.length;
             }
         } finally {
-            closeResultSet(rs);
             closeStatement(stmt);
+            closeResultSet(rs);
         }
 
         long elapsed = System.currentTimeMillis() - start;
@@ -558,6 +519,212 @@ public class RDBDocumentStoreJDBC {
         return result;
     }
 
+    public long getLong(Connection connection, RDBTableMetaData tmd, String selector, String minId, String maxId,
+            List<String> excludeKeyPatterns, List<QueryCondition> conditions) throws SQLException {
+        PreparedStatement stmt = null;
+        ResultSet rs = null;
+        long start = System.currentTimeMillis();
+        long result = -1;
+        try {
+            stmt = prepareQuery(connection, tmd, selector, minId, maxId, excludeKeyPatterns, conditions, Integer.MAX_VALUE, null);
+            rs = stmt.executeQuery();
+
+            result = rs.next() ? rs.getLong(1) : -1;
+            return result;
+        } finally {
+            closeStatement(stmt);
+            closeResultSet(rs);
+            if (LOG.isDebugEnabled()) {
+                long elapsed = System.currentTimeMillis() - start;
+                String params = String.format("params minid '%s' maxid '%s' excludeKeyPatterns %s conditions %s.", minId, maxId,
+                        excludeKeyPatterns, conditions);
+                LOG.debug("Aggregate query " + selector + " on " + tmd.getName() + " with " + params + " -> " + result + ", took " + elapsed
+                        + "ms");
+            }
+        }
+    }
+
+    @Nonnull
+    public Iterator<RDBRow> queryAsIterator(RDBConnectionHandler ch, RDBTableMetaData tmd, String minId, String maxId,
+            List<String> excludeKeyPatterns, List<QueryCondition> conditions, int limit, String sortBy) throws SQLException {
+        return new ResultSetIterator(ch, tmd, "ID, MODIFIED, MODCOUNT, CMODCOUNT, HASBINARY, DELETEDONCE, DATA, BDATA", minId,
+                maxId, excludeKeyPatterns, conditions, limit, sortBy);
+    }
+
+    private class ResultSetIterator implements Iterator<RDBRow>, Closeable {
+
+        private RDBConnectionHandler ch;
+        private Connection connection;
+        private RDBTableMetaData tmd;
+        private PreparedStatement stmt;
+        private ResultSet rs;
+        private RDBRow next = null;
+        private Exception callstack = null;
+        private long elapsed = 0;
+        private String message = null;
+        private long cnt = 0;
+
+        public ResultSetIterator(RDBConnectionHandler ch, RDBTableMetaData tmd, String string, String minId, String maxId,
+                List<String> excludeKeyPatterns, List<QueryCondition> conditions, int limit, String sortBy) throws SQLException {
+            long start = System.currentTimeMillis();
+            try {
+                this.ch = ch;
+                this.connection = ch.getROConnection();
+                this.tmd = tmd;
+                this.stmt = prepareQuery(connection, tmd, "ID, MODIFIED, MODCOUNT, CMODCOUNT, HASBINARY, DELETEDONCE, DATA, BDATA",
+                        minId, maxId, excludeKeyPatterns, conditions, limit, sortBy);
+                this.rs = stmt.executeQuery();
+                this.next = internalNext();
+                this.message = String.format("Query on %s with params minid '%s' maxid '%s' excludeKeyPatterns %s conditions %s.",
+                        tmd.getName(), minId, maxId, excludeKeyPatterns, conditions);
+                if (LOG.isDebugEnabled()) {
+                    callstack = new Exception("call stack");
+                }
+            } finally {
+                this.elapsed += (System.currentTimeMillis() - start);
+            }
+        }
+
+        @Override
+        public boolean hasNext() {
+            return next != null;
+        }
+
+        @Override
+        public void remove() {
+            throw new RuntimeException("remove not supported");
+        }
+
+        @Override
+        public RDBRow next() {
+            RDBRow result = next;
+            if (next != null) {
+                next = internalNext();
+                this.cnt += 1;
+                return result;
+            } else {
+                throw new NoSuchElementException("ResultSet exhausted");
+            }
+        }
+
+        private RDBRow internalNext() {
+            long start = System.currentTimeMillis();
+            try {
+                if (this.rs.next()) {
+                    String id = getIdFromRS(this.tmd, this.rs, 1);
+                    long modified = readLongFromResultSet(this.rs, 2);
+                    long modcount = readLongFromResultSet(this.rs, 3);
+                    long cmodcount = readLongFromResultSet(this.rs, 4);
+                    Long hasBinary = readLongOrNullFromResultSet(this.rs, 5);
+                    Boolean deletedOnce = readBooleanOrNullFromResultSet(this.rs, 6);
+                    String data = this.rs.getString(7);
+                    byte[] bdata = this.rs.getBytes(8);
+                    return new RDBRow(id, hasBinary, deletedOnce, modified, modcount, cmodcount, data, bdata);
+                } else {
+                    this.rs = closeResultSet(this.rs);
+                    this.stmt = closeStatement(this.stmt);
+                    this.connection.commit();
+                    internalClose();
+                    return null;
+                }
+            } catch (SQLException ex) {
+                LOG.debug("iterating through result set", ex);
+                throw new RuntimeException(ex);
+            } finally {
+                this.elapsed += (System.currentTimeMillis() - start);
+            }
+        }
+
+        @Override
+        public void close() throws IOException {
+            internalClose();
+        }
+
+        @Override
+        public void finalize() throws Throwable {
+            try {
+                if (this.connection != null) {
+                    if (this.callstack != null) {
+                        LOG.error("finalizing unclosed " + this + "; check caller", this.callstack);
+                    } else {
+                        LOG.error("finalizing unclosed " + this);
+                    }
+                }
+            } finally {
+                super.finalize();
+            }
+        }
+
+        private void internalClose() {
+            this.rs = closeResultSet(this.rs);
+            this.stmt = closeStatement(this.stmt);
+            this.ch.closeConnection(this.connection);
+            this.connection = null;
+            if (LOG.isDebugEnabled()) {
+                LOG.debug(this.message + " -> " + this.cnt + " results in " + elapsed + "ms");
+            }
+        }
+    }
+
+    @Nonnull
+    private PreparedStatement prepareQuery(Connection connection, RDBTableMetaData tmd, String columns, String minId, String maxId,
+            List<String> excludeKeyPatterns, List<QueryCondition> conditions, int limit, String sortBy) throws SQLException {
+
+        StringBuilder selectClause = new StringBuilder();
+
+        if (limit != Integer.MAX_VALUE && this.dbInfo.getFetchFirstSyntax() == FETCHFIRSTSYNTAX.TOP) {
+            selectClause.append("TOP " + limit + " ");
+        }
+
+        selectClause.append(columns + " from " + tmd.getName());
+
+        String whereClause = buildWhereClause(minId, maxId, excludeKeyPatterns, conditions);
+
+        StringBuilder query = new StringBuilder();
+        query.append("select ").append(selectClause);
+
+        if (whereClause.length() != 0) {
+            query.append(" where ").append(whereClause);
+        }
+
+        if (sortBy != null) {
+            query.append(" order by ID");
+        }
+
+        if (limit != Integer.MAX_VALUE) {
+            switch (this.dbInfo.getFetchFirstSyntax()) {
+                case LIMIT:
+                    query.append(" LIMIT " + limit);
+                    break;
+                case FETCHFIRST:
+                    query.append(" FETCH FIRST " + limit + " ROWS ONLY");
+                    break;
+                default:
+                    break;
+            }
+        }
+
+        PreparedStatement stmt = connection.prepareStatement(query.toString());
+
+        int si = 1;
+        if (minId != null) {
+            setIdInStatement(tmd, stmt, si++, minId);
+        }
+        if (maxId != null) {
+            setIdInStatement(tmd, stmt, si++, maxId);
+        }
+        for (String keyPattern : excludeKeyPatterns) {
+            setIdInStatement(tmd, stmt, si++, keyPattern);
+        }
+        for (QueryCondition cond : conditions) {
+            stmt.setLong(si++, cond.getValue());
+        }
+        if (limit != Integer.MAX_VALUE) {
+            stmt.setFetchSize(limit);
+        }
+        return stmt;
+    }
+
     public List<RDBRow> read(Connection connection, RDBTableMetaData tmd, Collection<String> allKeys) throws SQLException {
 
         List<RDBRow> rows = new ArrayList<RDBRow>();
@@ -724,6 +891,7 @@ public class RDBDocumentStoreJDBC {
         tmp.put(MODIFIED, "MODIFIED");
         tmp.put(NodeDocument.HAS_BINARY_FLAG, "HASBINARY");
         tmp.put(NodeDocument.DELETED_ONCE, "DELETEDONCE");
+        tmp.put(COLLISIONSMODCOUNT, "CMODCOUNT");
         INDEXED_PROP_MAPPING = Collections.unmodifiableMap(tmp);
     }
 
@@ -822,7 +990,7 @@ public class RDBDocumentStoreJDBC {
 
     private static final Integer INT_FALSE = 0;
     private static final Integer INT_TRUE = 1;
-    
+
     @CheckForNull
     private static Integer deletedOnceAsNullOrInteger(Boolean b) {
         return b == null ? null : (b.booleanValue() ? INT_TRUE : INT_FALSE);

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBVersionGCSupport.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBVersionGCSupport.java?rev=1788476&r1=1788475&r2=1788476&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBVersionGCSupport.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBVersionGCSupport.java Fri Mar 24 15:37:24 2017
@@ -18,10 +18,10 @@ package org.apache.jackrabbit.oak.plugin
 
 import static com.google.common.collect.Iterables.filter;
 
+import java.io.Closeable;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
 
@@ -30,15 +30,16 @@ import org.apache.jackrabbit.oak.plugins
 import org.apache.jackrabbit.oak.plugins.document.NodeDocument.SplitDocType;
 import org.apache.jackrabbit.oak.plugins.document.VersionGCSupport;
 import org.apache.jackrabbit.oak.plugins.document.rdb.RDBDocumentStore.QueryCondition;
+import org.apache.jackrabbit.oak.plugins.document.util.CloseableIterable;
 
 import com.google.common.base.Predicate;
-import com.google.common.collect.AbstractIterator;
 
 /**
  * RDB specific version of {@link VersionGCSupport} which uses an extended query
  * interface to fetch required {@link NodeDocument}s.
  */
 public class RDBVersionGCSupport extends VersionGCSupport {
+
     private RDBDocumentStore store;
 
     public RDBVersionGCSupport(RDBDocumentStore store) {
@@ -52,62 +53,22 @@ public class RDBVersionGCSupport extends
         conditions.add(new QueryCondition(NodeDocument.DELETED_ONCE, "=", 1));
         conditions.add(new QueryCondition(NodeDocument.MODIFIED_IN_SECS, "<", NodeDocument.getModifiedInSecs(toModified)));
         conditions.add(new QueryCondition(NodeDocument.MODIFIED_IN_SECS, ">=", NodeDocument.getModifiedInSecs(fromModified)));
-        return getIterator(RDBDocumentStore.EMPTY_KEY_PATTERN, conditions);
+        return store.queryAsIterable(Collection.NODES, null, null, RDBDocumentStore.EMPTY_KEY_PATTERN, conditions, Integer.MAX_VALUE, null);
     }
 
-    private Iterable<NodeDocument> getSplitDocuments() {
+    @Override
+    protected Iterable<NodeDocument> identifyGarbage(final Set<SplitDocType> gcTypes, final long oldestRevTimeStamp) {
         List<QueryCondition> conditions = Collections.emptyList();
         // absent support for SDTYPE as indexed property: exclude those
         // documents from the query which definitively aren't split documents
         List<String> excludeKeyPatterns = Arrays.asList("_:/%", "__:/%", "___:/%");
-        return getIterator(excludeKeyPatterns, conditions);
-    }
-
-    @Override
-    protected Iterable<NodeDocument> identifyGarbage(final Set<SplitDocType> gcTypes, final long oldestRevTimeStamp) {
-        return filter(getSplitDocuments(), new Predicate<NodeDocument>() {
+        Iterable<NodeDocument> it = store.queryAsIterable(Collection.NODES, null, null, excludeKeyPatterns, conditions,
+                Integer.MAX_VALUE, null);
+        return CloseableIterable.wrap(filter(it, new Predicate<NodeDocument>() {
             @Override
             public boolean apply(NodeDocument doc) {
                 return gcTypes.contains(doc.getSplitDocType()) && doc.hasAllRevisionLessThan(oldestRevTimeStamp);
             }
-        });
-    }
-
-    private Iterable<NodeDocument> getIterator(final List<String> excludeKeyPatterns, final List<QueryCondition> conditions) {
-        return new Iterable<NodeDocument>() {
-            @Override
-            public Iterator<NodeDocument> iterator() {
-                return new AbstractIterator<NodeDocument>() {
-
-                    private static final int BATCH_SIZE = 100;
-                    private String startId = NodeDocument.MIN_ID_VALUE;
-                    private Iterator<NodeDocument> batch = nextBatch();
-
-                    @Override
-                    protected NodeDocument computeNext() {
-                        // read next batch if necessary
-                        if (!batch.hasNext()) {
-                            batch = nextBatch();
-                        }
-
-                        NodeDocument doc;
-                        if (batch.hasNext()) {
-                            doc = batch.next();
-                            // remember current id
-                            startId = doc.getId();
-                        } else {
-                            doc = endOfData();
-                        }
-                        return doc;
-                    }
-
-                    private Iterator<NodeDocument> nextBatch() {
-                        List<NodeDocument> result = store.query(Collection.NODES, startId, NodeDocument.MAX_ID_VALUE,
-                                excludeKeyPatterns, conditions, BATCH_SIZE);
-                        return result.iterator();
-                    }
-                };
-            }
-        };
+        }), (Closeable) it);
     }
 }

Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStoreJDBCTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStoreJDBCTest.java?rev=1788476&r1=1788475&r2=1788476&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStoreJDBCTest.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStoreJDBCTest.java Fri Mar 24 15:37:24 2017
@@ -25,28 +25,44 @@ import static org.junit.Assert.assertTru
 import static org.junit.Assert.fail;
 import static org.junit.Assume.assumeTrue;
 
+import java.io.Closeable;
 import java.io.UnsupportedEncodingException;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
 import java.sql.BatchUpdateException;
 import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
 
+import javax.sql.DataSource;
+
+import org.apache.jackrabbit.oak.commons.junit.LogCustomizer;
 import org.apache.jackrabbit.oak.plugins.document.AbstractDocumentStoreTest;
 import org.apache.jackrabbit.oak.plugins.document.Collection;
 import org.apache.jackrabbit.oak.plugins.document.DocumentStoreException;
 import org.apache.jackrabbit.oak.plugins.document.DocumentStoreFixture;
 import org.apache.jackrabbit.oak.plugins.document.NodeDocument;
 import org.apache.jackrabbit.oak.plugins.document.UpdateOp;
+import org.apache.jackrabbit.oak.plugins.document.rdb.RDBDocumentStore.QueryCondition;
 import org.apache.jackrabbit.oak.plugins.document.rdb.RDBDocumentStore.RDBTableMetaData;
+import org.apache.jackrabbit.oak.plugins.document.util.Utils;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TestName;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import ch.qos.logback.classic.Level;
+
 /**
  * Tests checking certain JDBC related features.
  */
@@ -56,6 +72,9 @@ public class RDBDocumentStoreJDBCTest ex
     private RDBDocumentStoreDB dbInfo;
     private static final Logger LOG = LoggerFactory.getLogger(RDBDocumentStoreJDBCTest.class);
 
+    @Rule
+    public TestName name= new TestName();
+
     public RDBDocumentStoreJDBCTest(DocumentStoreFixture dsf) {
         super(dsf);
         assumeTrue(super.rdbDataSource != null);
@@ -263,6 +282,202 @@ public class RDBDocumentStoreJDBCTest ex
         }
     }
 
+    private class MyConnectionHandler extends RDBConnectionHandler {
+
+        public AtomicInteger cnt = new AtomicInteger();
+
+        public MyConnectionHandler(DataSource ds) {
+            super(ds);
+        }
+
+        @Override
+        public Connection getROConnection() throws SQLException {
+            cnt.incrementAndGet();
+            return super.getROConnection();
+        }
+
+        @Override
+        public Connection getRWConnection() throws SQLException {
+            throw new RuntimeException();
+        }
+
+        @Override
+        public void closeConnection(Connection c) {
+            super.closeConnection(c);
+            cnt.decrementAndGet();
+        }
+    }
+
+    @Test
+    public void queryIteratorNotStartedTest() throws SQLException {
+        insertTestResource(this.getClass().getName() + "." + name.getMethodName());
+
+        MyConnectionHandler ch = new MyConnectionHandler(super.rdbDataSource);
+        RDBTableMetaData tmd = ((RDBDocumentStore) super.ds).getTable(Collection.NODES);
+        List<QueryCondition> conditions = Collections.emptyList();
+
+        Iterator<RDBRow> qi = jdbc.queryAsIterator(ch, tmd, null, null, RDBDocumentStore.EMPTY_KEY_PATTERN, conditions,
+                Integer.MAX_VALUE, null);
+        assertTrue(qi instanceof Closeable);
+        assertEquals(1, ch.cnt.get());
+        Utils.closeIfCloseable(qi);
+        assertEquals(0, ch.cnt.get());
+    }
+
+    @Test
+    public void queryIteratorConsumedTest() throws SQLException {
+        insertTestResource(this.getClass().getName() + "." + name.getMethodName());
+
+        LogCustomizer customLogs = LogCustomizer.forLogger(RDBDocumentStoreJDBC.class.getName()).enable(Level.DEBUG)
+                .contains("Query on ").create();
+        customLogs.starting();
+
+        MyConnectionHandler ch = new MyConnectionHandler(super.rdbDataSource);
+        RDBTableMetaData tmd = ((RDBDocumentStore) super.ds).getTable(Collection.NODES);
+        List<QueryCondition> conditions = Collections.emptyList();
+
+        try {
+            Iterator<RDBRow> qi = jdbc.queryAsIterator(ch, tmd, null, null, RDBDocumentStore.EMPTY_KEY_PATTERN, conditions,
+                    Integer.MAX_VALUE, null);
+            assertTrue(qi instanceof Closeable);
+            assertEquals(1, ch.cnt.get());
+            while (qi.hasNext()) {
+                qi.next();
+            }
+            assertEquals(0, ch.cnt.get());
+            assertEquals("should have a DEBUG level log entry", 1, customLogs.getLogs().size());
+        } finally {
+            customLogs.finished();
+            customLogs = null;
+        }
+    }
+
+    @Test
+    public void queryIteratorNotConsumedTest() throws SQLException, NoSuchMethodException, SecurityException, IllegalAccessException, IllegalArgumentException, InvocationTargetException {
+        LogCustomizer customLogs = LogCustomizer.forLogger(RDBDocumentStoreJDBC.class.getName()).enable(Level.DEBUG).contains("finalizing unclosed").create();
+        customLogs.starting();
+
+        insertTestResource(this.getClass().getName() + "." + name.getMethodName());
+
+        MyConnectionHandler ch = new MyConnectionHandler(super.rdbDataSource);
+        RDBTableMetaData tmd = ((RDBDocumentStore) super.ds).getTable(Collection.NODES);
+        List<QueryCondition> conditions = Collections.emptyList();
+        Iterator<RDBRow> qi = jdbc.queryAsIterator(ch, tmd, null, null, RDBDocumentStore.EMPTY_KEY_PATTERN, conditions,
+                Integer.MAX_VALUE, null);
+        assertTrue(qi instanceof Closeable);
+        assertEquals(1, ch.cnt.get());
+        Method fin = qi.getClass().getDeclaredMethod("finalize");
+
+        try {
+            fin.setAccessible(true);
+            fin.invoke(qi);
+
+            assertTrue("finalizing non-consumed iterator should generate log entry", customLogs.getLogs().size() >= 1);
+        } finally {
+            Utils.closeIfCloseable(qi);
+            fin.setAccessible(false);
+            customLogs.finished();
+        }
+    }
+
+    @Test
+    public void queryCountTest() throws SQLException {
+        insertTestResource(this.getClass().getName() + "." + name.getMethodName());
+
+        Connection con = super.rdbDataSource.getConnection();
+        try {
+            con.setReadOnly(true);
+            RDBTableMetaData tmd = ((RDBDocumentStore) super.ds).getTable(Collection.NODES);
+            List<QueryCondition> conditions = Collections.emptyList();
+            long cnt = jdbc.getLong(con, tmd, "count(*)", null, null, RDBDocumentStore.EMPTY_KEY_PATTERN, conditions);
+            assertTrue(cnt > 0);
+        } finally {
+            con.close();
+        }
+    }
+
+    @Test
+    public void queryMinLastModifiedTest() throws SQLException {
+        String baseName = this.getClass().getName() + "." + name.getMethodName();
+
+        long magicValue = (long)(Math.random() * 100000);
+
+        String baseNameNullModified = baseName + "-1";
+        super.ds.remove(Collection.NODES, baseNameNullModified);
+        UpdateOp op = new UpdateOp(baseNameNullModified, true);
+        op.set(RDBDocumentStore.COLLISIONSMODCOUNT, magicValue);
+        op.set(NodeDocument.DELETED_ONCE, true);
+        assertTrue(super.ds.create(Collection.NODES, Collections.singletonList(op)));
+        removeMe.add(baseNameNullModified);
+
+        String baseName10Modified = baseName + "-2";
+        super.ds.remove(Collection.NODES, baseName10Modified);
+        op = new UpdateOp(baseName10Modified, true);
+        op.set(RDBDocumentStore.COLLISIONSMODCOUNT, magicValue);
+        op.set(NodeDocument.MODIFIED_IN_SECS, 10);
+        op.set(NodeDocument.DELETED_ONCE, true);
+        assertTrue(super.ds.create(Collection.NODES, Collections.singletonList(op)));
+        removeMe.add(baseName10Modified);
+
+        String baseName20Modified = baseName + "-3";
+        super.ds.remove(Collection.NODES, baseName20Modified);
+        op = new UpdateOp(baseName20Modified, true);
+        op.set(RDBDocumentStore.COLLISIONSMODCOUNT, magicValue);
+        op.set(NodeDocument.MODIFIED_IN_SECS, 20);
+        op.set(NodeDocument.DELETED_ONCE, true);
+        assertTrue(super.ds.create(Collection.NODES, Collections.singletonList(op)));
+        removeMe.add(baseName20Modified);
+
+        String baseName5ModifiedNoDeletedOnce = baseName + "-4";
+        super.ds.remove(Collection.NODES, baseName5ModifiedNoDeletedOnce);
+        op = new UpdateOp(baseName5ModifiedNoDeletedOnce, true);
+        op.set(RDBDocumentStore.COLLISIONSMODCOUNT, magicValue);
+        op.set(NodeDocument.MODIFIED_IN_SECS, 5);
+        assertTrue(super.ds.create(Collection.NODES, Collections.singletonList(op)));
+        removeMe.add(baseName5ModifiedNoDeletedOnce);
+
+        String selector = "min(MODIFIED)";
+        LogCustomizer customLogs = LogCustomizer.forLogger(RDBDocumentStoreJDBC.class.getName()).enable(Level.DEBUG)
+                .contains("Aggregate query").contains(selector).create();
+        customLogs.starting();
+        Connection con = super.rdbDataSource.getConnection();
+        try {
+            con.setReadOnly(true);
+            RDBTableMetaData tmd = ((RDBDocumentStore) super.ds).getTable(Collection.NODES);
+            List<QueryCondition> conditions = new ArrayList<QueryCondition>();
+            conditions.add(new QueryCondition(RDBDocumentStore.COLLISIONSMODCOUNT, "=", magicValue));
+            long min = jdbc.getLong(con, tmd, selector, null, null, RDBDocumentStore.EMPTY_KEY_PATTERN, conditions);
+            assertEquals(5, min);
+            con.commit();
+        } finally {
+            con.close();
+            assertEquals("should have a DEBUG level log entry", 1, customLogs.getLogs().size());
+            customLogs.finished();
+            customLogs = null;
+        }
+
+        con = super.rdbDataSource.getConnection();
+        try {
+            con.setReadOnly(true);
+            RDBTableMetaData tmd = ((RDBDocumentStore) super.ds).getTable(Collection.NODES);
+            List<QueryCondition> conditions = new ArrayList<QueryCondition>();
+            conditions.add(new QueryCondition(RDBDocumentStore.COLLISIONSMODCOUNT, "=", magicValue));
+            conditions.add(new QueryCondition(NodeDocument.DELETED_ONCE, "=", 1));
+            long min = jdbc.getLong(con, tmd, selector, null, null, RDBDocumentStore.EMPTY_KEY_PATTERN, conditions);
+            assertEquals(10, min);
+            con.commit();
+        } finally {
+            con.close();
+        }
+}
+
+    private void insertTestResource(String id) {
+        super.ds.remove(Collection.NODES, id);
+        UpdateOp op = new UpdateOp(id, true);
+        removeMe.add(id);
+        assertTrue(super.ds.create(Collection.NODES, Collections.singletonList(op)));
+    }
+
     private static boolean isSuccess(int result) {
         return result == 1 || result == Statement.SUCCESS_NO_INFO;
     }

Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStoreTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStoreTest.java?rev=1788476&r1=1788475&r2=1788476&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStoreTest.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStoreTest.java Fri Mar 24 15:37:24 2017
@@ -19,6 +19,7 @@ package org.apache.jackrabbit.oak.plugin
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
+import java.io.Closeable;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -30,6 +31,7 @@ import org.apache.jackrabbit.oak.plugins
 import org.apache.jackrabbit.oak.plugins.document.NodeDocument;
 import org.apache.jackrabbit.oak.plugins.document.UpdateOp;
 import org.apache.jackrabbit.oak.plugins.document.rdb.RDBDocumentStore.QueryCondition;
+import org.apache.jackrabbit.oak.plugins.document.util.Utils;
 import org.junit.Test;
 
 public class RDBDocumentStoreTest extends AbstractDocumentStoreTest {
@@ -69,10 +71,11 @@ public class RDBDocumentStoreTest extend
     @Test
     public void testRDBQueryKeyPatterns() {
         if (ds instanceof RDBDocumentStore) {
+            int cnt = 10;
             RDBDocumentStore rds = (RDBDocumentStore) ds;
             // create ten documents
             String base = this.getClass().getName() + ".testRDBQuery-";
-            for (int i = 0; i < 10; i++) {
+            for (int i = 0; i < cnt; i++) {
                 // every second is a "regular" path
                 String id = "1:" + (i % 2 == 1 ? "p" : "") + "/" + base + i;
                 UpdateOp up = new UpdateOp(id, true);
@@ -90,6 +93,29 @@ public class RDBDocumentStoreTest extend
                     assertTrue(d.getId().startsWith("1:p"));
                 }
             }
+
+            Iterable<NodeDocument> it = rds.queryAsIterable(Collection.NODES, NodeDocument.MIN_ID_VALUE, NodeDocument.MAX_ID_VALUE,
+                    Arrays.asList("_:/%", "__:/%", "___:/%"), conditions, Integer.MAX_VALUE, null);
+            assertTrue(it instanceof Closeable);
+
+            int c1 = 0, c2 = 0;
+            for (NodeDocument d : it) {
+                if (base.equals(d.get("_test"))) {
+                    assertTrue(d.getId().startsWith("1:p"));
+                    c1 += 1;
+                }
+            }
+            // check that getting the iterator twice works
+            for (NodeDocument d : it) {
+                if (base.equals(d.get("_test"))) {
+                    assertTrue(d.getId().startsWith("1:p"));
+                    c2 += 1;
+                }
+            }
+            assertEquals(cnt / 2, c1);
+            assertEquals(cnt / 2, c2);
+
+            Utils.closeIfCloseable(it);
         }
     }
 }