You are viewing a plain text version of this content. The canonical link for it is here.
Posted to torque-dev@db.apache.org by tv...@apache.org on 2018/12/09 14:59:45 UTC

svn commit: r1848523 - in /db/torque/torque4/trunk/torque-runtime/src/main/java/org/apache/torque/util: BasePeerImpl.java ResultsetSpliterator.java

Author: tv
Date: Sun Dec  9 14:59:45 2018
New Revision: 1848523

URL: http://svn.apache.org/viewvc?rev=1848523&view=rev
Log:
TORQUE-354: Add doSelectAsStream() to BasePeerImpl

Added:
    db/torque/torque4/trunk/torque-runtime/src/main/java/org/apache/torque/util/ResultsetSpliterator.java   (with props)
Modified:
    db/torque/torque4/trunk/torque-runtime/src/main/java/org/apache/torque/util/BasePeerImpl.java

Modified: db/torque/torque4/trunk/torque-runtime/src/main/java/org/apache/torque/util/BasePeerImpl.java
URL: http://svn.apache.org/viewvc/db/torque/torque4/trunk/torque-runtime/src/main/java/org/apache/torque/util/BasePeerImpl.java?rev=1848523&r1=1848522&r2=1848523&view=diff
==============================================================================
--- db/torque/torque4/trunk/torque-runtime/src/main/java/org/apache/torque/util/BasePeerImpl.java (original)
+++ db/torque/torque4/trunk/torque-runtime/src/main/java/org/apache/torque/util/BasePeerImpl.java Sun Dec  9 14:59:45 2018
@@ -32,8 +32,12 @@ import java.util.List;
 import java.util.Map;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
 
 import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang.time.StopWatch;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.torque.Column;
@@ -276,15 +280,14 @@ public class BasePeerImpl<T> implements
                     preparedStatement,
                     query.getPreparedStatementReplacements(),
                     0);
-            long startTime = System.currentTimeMillis();
+            StopWatch stopWatch = new StopWatch();
             log.debug("Executing delete " + sql
                     + ", parameters = "
                     + replacements);
 
+            stopWatch.start();
             int affectedRows = preparedStatement.executeUpdate();
-            long queryEndTime = System.currentTimeMillis();
-            log.trace("delete took " + (queryEndTime - startTime)
-                    + " milliseconds");
+            log.trace("Delete took " + stopWatch.getTime() + " milliseconds");
 
             return affectedRows;
         }
@@ -466,14 +469,14 @@ public class BasePeerImpl<T> implements
                 }
                 position++;
             }
-            long startTime = System.currentTimeMillis();
+
+            StopWatch stopWatch = new StopWatch();
             log.debug("Executing insert " + query.toString()
             + " using parameters " + replacementObjects);
 
+            stopWatch.start();
             preparedStatement.executeUpdate();
-            long queryEndTime = System.currentTimeMillis();
-            log.trace("insert took " + (queryEndTime - startTime)
-                    + " milliseconds");
+            log.trace("Insert took " + stopWatch.getTime() + " milliseconds");
         }
         catch (SQLException e)
         {
@@ -676,14 +679,14 @@ public class BasePeerImpl<T> implements
                     preparedStatement,
                     selectQuery.getPreparedStatementReplacements(),
                     0);
-            long startTime = System.currentTimeMillis();
+
+            StopWatch stopWatch = new StopWatch();
             log.debug("Executing insert " + query.toString()
             + " using parameters " + replacements);
 
+            stopWatch.start();
             numberOfInsertedRows = preparedStatement.executeUpdate();
-            long queryEndTime = System.currentTimeMillis();
-            log.trace("insert took " + (queryEndTime - startTime)
-                    + " milliseconds");
+            log.trace("Insert took " + stopWatch.getTime() + " milliseconds");
         }
         catch (SQLException e)
         {
@@ -1003,39 +1006,56 @@ public class BasePeerImpl<T> implements
             final Connection connection)
                     throws TorqueException
     {
+        try (Stream<TT> resultStream = doSelectAsStream(query, mapper, connection))
+        {
+            return resultStream.collect(Collectors.toList());
+        }
+    }
+
+    /**
+     * Selects rows from a database an maps them to objects.
+     * This method returns a stream that <b>must</b> be closed after use.
+     * All resources used by this method will be closed when the stream is
+     * closed.
+     *
+     * @param query the SQL Query to execute, not null.
+     * @param mapper The mapper creating the objects from the resultSet,
+     *        not null.
+     * @param connection the database connection, not null.
+     *
+     * @return The results of the query, not null.
+     *
+     * @throws TorqueException if querying the database fails.
+     */
+    public <TT> Stream<TT> doSelectAsStream(
+            final String query,
+            final RecordMapper<TT> mapper,
+            final Connection connection)
+                    throws TorqueException
+    {
         if (connection == null)
         {
             throw new NullPointerException("connection is null");
         }
 
-        List<TT> result = new ArrayList<>();
-        try (Statement statement = connection.createStatement())
+        try
         {
-            long startTime = System.currentTimeMillis();
+            Statement statement = connection.createStatement();
+            StopWatch stopWatch = new StopWatch();
             log.debug("Executing query " + query);
 
-            try (ResultSet resultSet = statement.executeQuery(query.toString()))
-            {
-                long queryEndTime = System.currentTimeMillis();
-                log.trace("query took " + (queryEndTime - startTime)
-                        + " milliseconds");
+            stopWatch.start();
+            ResultSet resultSet = statement.executeQuery(query.toString());
+            ResultsetSpliterator<TT> spliterator =
+                    new ResultsetSpliterator<>(mapper, null, statement, resultSet);
+            log.trace("Query took " + stopWatch.getTime() + " milliseconds");
 
-                while (resultSet.next())
-                {
-                    TT rowResult = mapper.processRow(resultSet, 0, null);
-                    result.add(rowResult);
-                }
-                long mappingEndTime = System.currentTimeMillis();
-                log.trace("mapping took " + (mappingEndTime - queryEndTime)
-                        + " milliseconds");
-            }
+            return StreamSupport.stream(spliterator, false).onClose(spliterator);
         }
         catch (SQLException e)
         {
             throw ExceptionMapper.getInstance().toTorqueException(e);
         }
-
-        return result;
     }
 
     /**
@@ -1057,6 +1077,48 @@ public class BasePeerImpl<T> implements
             final Connection connection)
                     throws TorqueException
     {
+        try (Stream<TT> resultStream = doSelectAsStream(criteria, mapper, connection))
+        {
+            List<TT> result = resultStream.collect(Collectors.toList());
+
+            if (criteria.isSingleRecord() && result.size() > 1)
+            {
+                throw new TooManyRowsException(
+                        "Criteria expected single Record and "
+                                + "Multiple Records were selected");
+            }
+
+            return result;
+        }
+    }
+
+    /**
+     * Performs a SQL <code>select</code> using a PreparedStatement.
+     * This method returns a stream that <b>must</b> be closed after use.
+     * All resources used by this method will be closed when the stream is
+     * closed.
+     *
+     * @param criteria A Criteria specifying the records to select, not null.
+     * @param mapper The mapper creating the objects from the resultSet,
+     *        not null.
+     * @param connection the database connection for selecting records,
+     *        not null.
+     *
+     * @return The results of the query as a Stream, not null.
+     *
+     * @throws TorqueException Error performing database query.
+     */
+    public <TT> Stream<TT> doSelectAsStream(
+            final Criteria criteria,
+            final RecordMapper<TT> mapper,
+            final Connection connection)
+                    throws TorqueException
+    {
+        if (connection == null)
+        {
+            throw new NullPointerException("connection is null");
+        }
+
         correctBooleans(criteria);
 
         Query query = SqlBuilder.buildQuery(criteria);
@@ -1068,8 +1130,9 @@ public class BasePeerImpl<T> implements
             query.getFromClause().add(new FromElement(tableName));
         }
 
-        try (PreparedStatement statement = connection.prepareStatement(query.toString()))
+        try
         {
+            PreparedStatement statement = connection.prepareStatement(query.toString());
             if (query.getFetchSize() != null)
             {
                 statement.setFetchSize(query.getFetchSize());
@@ -1080,85 +1143,18 @@ public class BasePeerImpl<T> implements
                     query.getPreparedStatementReplacements(),
                     0);
 
-            // Set offset and limit
-            long offset;
-            Database database = Torque.getDatabase(criteria.getDbName());
-            if (database.getAdapter().supportsNativeOffset())
-            {
-                offset = 0; //database takes care of offset
-            }
-            else
-            {
-                offset = criteria.getOffset();
-            }
-
-            long limit;
-            if (database.getAdapter().supportsNativeLimit())
-            {
-                limit = -1; //database takes care of offset
-            }
-            else
-            {
-                if (database.getAdapter().supportsNativeOffset())
-                {
-                    limit = criteria.getLimit();
-                }
-                else
-                {
-                    if (criteria.getLimit() == -1)
-                    {
-                        limit = criteria.getLimit();
-                    }
-                    else
-                    {
-                        limit = offset + criteria.getLimit();
-                    }
-                }
-            }
-
-            long startTime = System.currentTimeMillis();
+            StopWatch stopWatch = new StopWatch();
             log.debug("Executing query " + query
                     + ", parameters = "
                     + replacements);
 
-            try (ResultSet resultSet = statement.executeQuery())
-            {
-                long queryEndTime = System.currentTimeMillis();
-                log.trace("query took " + (queryEndTime - startTime)
-                        + " milliseconds");
-
-                List<TT> result = new ArrayList<>();
-                int rowNumber = 0;
-                while (resultSet.next())
-                {
-                    if (rowNumber < offset)
-                    {
-                        rowNumber++;
-                        continue;
-                    }
-                    if (limit >= 0 && rowNumber >= limit)
-                    {
-                        break;
-                    }
-
-                    TT rowResult = mapper.processRow(resultSet, 0, criteria);
-                    result.add(rowResult);
+            stopWatch.start();
+            ResultSet resultSet = statement.executeQuery();
+            ResultsetSpliterator<TT> spliterator =
+                    new ResultsetSpliterator<>(mapper, criteria, statement, resultSet);
+            log.trace("Query took " + stopWatch.getTime() + " milliseconds");
 
-                    rowNumber++;
-                }
-                long mappingEndTime = System.currentTimeMillis();
-                log.trace("mapping took " + (mappingEndTime - queryEndTime)
-                        + " milliseconds");
-
-                if (criteria.isSingleRecord() && result.size() > 1)
-                {
-                    throw new TooManyRowsException(
-                            "Criteria expected single Record and "
-                                    + "Multiple Records were selected");
-                }
-
-                return result;
-            }
+            return StreamSupport.stream(spliterator, false).onClose(spliterator);
         }
         catch (SQLException e)
         {
@@ -1410,16 +1406,15 @@ public class BasePeerImpl<T> implements
                     preparedStatement,
                     query.getPreparedStatementReplacements(),
                     position - 1);
-            long startTime = System.currentTimeMillis();
+            StopWatch stopWatch = new StopWatch();
             log.debug("Executing update " + query.toString()
             + " using update parameters " + replacementObjects
             + " and query parameters "
             + replacements);
 
+            stopWatch.start();
             int affectedRows = preparedStatement.executeUpdate();
-            long queryEndTime = System.currentTimeMillis();
-            log.trace("update took " + (queryEndTime - startTime)
-                    + " milliseconds");
+            log.trace("Update took " + stopWatch.getTime() + " milliseconds");
 
             return affectedRows;
         }

Added: db/torque/torque4/trunk/torque-runtime/src/main/java/org/apache/torque/util/ResultsetSpliterator.java
URL: http://svn.apache.org/viewvc/db/torque/torque4/trunk/torque-runtime/src/main/java/org/apache/torque/util/ResultsetSpliterator.java?rev=1848523&view=auto
==============================================================================
--- db/torque/torque4/trunk/torque-runtime/src/main/java/org/apache/torque/util/ResultsetSpliterator.java (added)
+++ db/torque/torque4/trunk/torque-runtime/src/main/java/org/apache/torque/util/ResultsetSpliterator.java Sun Dec  9 14:59:45 2018
@@ -0,0 +1,159 @@
+package org.apache.torque.util;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Spliterator;
+import java.util.Spliterators.AbstractSpliterator;
+import java.util.function.Consumer;
+
+import org.apache.torque.Database;
+import org.apache.torque.Torque;
+import org.apache.torque.TorqueException;
+import org.apache.torque.TorqueRuntimeException;
+import org.apache.torque.criteria.Criteria;
+import org.apache.torque.om.mapper.RecordMapper;
+
+/**
+ * Stream support: Encapsulate iteration over a JDBC ResultSet
+ *
+ * @author <a href="mailto:tv@apache.org">Thomas Vandahl</a>
+ */
+public class ResultsetSpliterator<T> extends AbstractSpliterator<T> implements Runnable
+{
+    private final RecordMapper<T> recordMapper;
+    private final Criteria criteria;
+    private final Statement statement;
+    private final ResultSet resultSet;
+
+    private long offset;
+    private long limit;
+    private long rowNumber;
+
+    /**
+     * Constructor
+     *
+     * @param recordMapper a RecordMapper to map ResultSet rows to entities of type
+     *                     <T>
+     * @param criteria     a Criteria
+     * @param statement    the statement that created the ResultSet
+     * @param resultSet    the JDBC result set
+     * @throws TorqueException
+     */
+    public ResultsetSpliterator(RecordMapper<T> recordMapper, Criteria criteria,
+            Statement statement, ResultSet resultSet) throws TorqueException
+    {
+        super(Long.MAX_VALUE, Spliterator.ORDERED);
+
+        this.recordMapper = recordMapper;
+        this.criteria = criteria;
+        this.statement = statement;
+        this.resultSet = resultSet;
+        this.offset = 0; //database takes care of offset
+        this.limit = -1; //database takes care of limit
+        this.rowNumber = 0;
+
+        // Set offset and limit
+        if (criteria != null)
+        {
+            Database database = Torque.getDatabase(criteria.getDbName());
+            if (!database.getAdapter().supportsNativeOffset())
+            {
+                offset = criteria.getOffset();
+            }
+
+            if (!database.getAdapter().supportsNativeLimit())
+            {
+                if (database.getAdapter().supportsNativeOffset())
+                {
+                    limit = criteria.getLimit();
+                }
+                else if (criteria.getLimit() != -1)
+                {
+                    limit = offset + criteria.getLimit();
+                }
+            }
+        }
+    }
+
+    /**
+     * Advance ResultSet and map row to entity <T>
+     *
+     * @see java.util.Spliterator#tryAdvance(java.util.function.Consumer)
+     */
+    @Override
+    public boolean tryAdvance(Consumer<? super T> action)
+    {
+        try
+        {
+            while (resultSet.next())
+            {
+                if (rowNumber < offset)
+                {
+                    rowNumber++;
+                    continue;
+                }
+                if (limit >= 0 && rowNumber >= limit)
+                {
+                    return false;
+                }
+
+                rowNumber++;
+                T result = recordMapper.processRow(resultSet, 0, criteria);
+                action.accept(result);
+                return true;
+            }
+        }
+        catch (SQLException e)
+        {
+            throw new TorqueRuntimeException(e);
+        }
+
+        return false;
+    }
+
+    /**
+     * Method to be run onClose() of associated stream
+     *
+     * @see java.lang.Runnable#run()
+     */
+    @Override
+    public void run()
+    {
+        try
+        {
+            resultSet.close();
+        }
+        catch (SQLException e)
+        {
+            throw new TorqueRuntimeException(e);
+        }
+        try
+        {
+            statement.close();
+        }
+        catch (SQLException e)
+        {
+            throw new TorqueRuntimeException(e);
+        }
+    }
+}

Propchange: db/torque/torque4/trunk/torque-runtime/src/main/java/org/apache/torque/util/ResultsetSpliterator.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain



---------------------------------------------------------------------
To unsubscribe, e-mail: torque-dev-unsubscribe@db.apache.org
For additional commands, e-mail: torque-dev-help@db.apache.org