You are viewing a plain text version of this content. The canonical link for it is here.
Posted to torque-dev@db.apache.org by tv...@apache.org on 2018/12/09 14:59:45 UTC
svn commit: r1848523 - in
/db/torque/torque4/trunk/torque-runtime/src/main/java/org/apache/torque/util:
BasePeerImpl.java ResultsetSpliterator.java
Author: tv
Date: Sun Dec 9 14:59:45 2018
New Revision: 1848523
URL: http://svn.apache.org/viewvc?rev=1848523&view=rev
Log:
TORQUE-354: Add doSelectAsStream() to BasePeerImpl
Added:
db/torque/torque4/trunk/torque-runtime/src/main/java/org/apache/torque/util/ResultsetSpliterator.java (with props)
Modified:
db/torque/torque4/trunk/torque-runtime/src/main/java/org/apache/torque/util/BasePeerImpl.java
Modified: db/torque/torque4/trunk/torque-runtime/src/main/java/org/apache/torque/util/BasePeerImpl.java
URL: http://svn.apache.org/viewvc/db/torque/torque4/trunk/torque-runtime/src/main/java/org/apache/torque/util/BasePeerImpl.java?rev=1848523&r1=1848522&r2=1848523&view=diff
==============================================================================
--- db/torque/torque4/trunk/torque-runtime/src/main/java/org/apache/torque/util/BasePeerImpl.java (original)
+++ db/torque/torque4/trunk/torque-runtime/src/main/java/org/apache/torque/util/BasePeerImpl.java Sun Dec 9 14:59:45 2018
@@ -32,8 +32,12 @@ import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang.time.StopWatch;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.torque.Column;
@@ -276,15 +280,14 @@ public class BasePeerImpl<T> implements
preparedStatement,
query.getPreparedStatementReplacements(),
0);
- long startTime = System.currentTimeMillis();
+ StopWatch stopWatch = new StopWatch();
log.debug("Executing delete " + sql
+ ", parameters = "
+ replacements);
+ stopWatch.start();
int affectedRows = preparedStatement.executeUpdate();
- long queryEndTime = System.currentTimeMillis();
- log.trace("delete took " + (queryEndTime - startTime)
- + " milliseconds");
+ log.trace("Delete took " + stopWatch.getTime() + " milliseconds");
return affectedRows;
}
@@ -466,14 +469,14 @@ public class BasePeerImpl<T> implements
}
position++;
}
- long startTime = System.currentTimeMillis();
+
+ StopWatch stopWatch = new StopWatch();
log.debug("Executing insert " + query.toString()
+ " using parameters " + replacementObjects);
+ stopWatch.start();
preparedStatement.executeUpdate();
- long queryEndTime = System.currentTimeMillis();
- log.trace("insert took " + (queryEndTime - startTime)
- + " milliseconds");
+ log.trace("Insert took " + stopWatch.getTime() + " milliseconds");
}
catch (SQLException e)
{
@@ -676,14 +679,14 @@ public class BasePeerImpl<T> implements
preparedStatement,
selectQuery.getPreparedStatementReplacements(),
0);
- long startTime = System.currentTimeMillis();
+
+ StopWatch stopWatch = new StopWatch();
log.debug("Executing insert " + query.toString()
+ " using parameters " + replacements);
+ stopWatch.start();
numberOfInsertedRows = preparedStatement.executeUpdate();
- long queryEndTime = System.currentTimeMillis();
- log.trace("insert took " + (queryEndTime - startTime)
- + " milliseconds");
+ log.trace("Insert took " + stopWatch.getTime() + " milliseconds");
}
catch (SQLException e)
{
@@ -1003,39 +1006,56 @@ public class BasePeerImpl<T> implements
final Connection connection)
throws TorqueException
{
+ try (Stream<TT> resultStream = doSelectAsStream(query, mapper, connection))
+ {
+ return resultStream.collect(Collectors.toList());
+ }
+ }
+
+ /**
+ * Selects rows from a database an maps them to objects.
+ * This method returns a stream that <b>must</b> be closed after use.
+ * All resources used by this method will be closed when the stream is
+ * closed.
+ *
+ * @param query the SQL Query to execute, not null.
+ * @param mapper The mapper creating the objects from the resultSet,
+ * not null.
+ * @param connection the database connection, not null.
+ *
+ * @return The results of the query, not null.
+ *
+ * @throws TorqueException if querying the database fails.
+ */
+ public <TT> Stream<TT> doSelectAsStream(
+ final String query,
+ final RecordMapper<TT> mapper,
+ final Connection connection)
+ throws TorqueException
+ {
if (connection == null)
{
throw new NullPointerException("connection is null");
}
- List<TT> result = new ArrayList<>();
- try (Statement statement = connection.createStatement())
+ try
{
- long startTime = System.currentTimeMillis();
+ Statement statement = connection.createStatement();
+ StopWatch stopWatch = new StopWatch();
log.debug("Executing query " + query);
- try (ResultSet resultSet = statement.executeQuery(query.toString()))
- {
- long queryEndTime = System.currentTimeMillis();
- log.trace("query took " + (queryEndTime - startTime)
- + " milliseconds");
+ stopWatch.start();
+ ResultSet resultSet = statement.executeQuery(query.toString());
+ ResultsetSpliterator<TT> spliterator =
+ new ResultsetSpliterator<>(mapper, null, statement, resultSet);
+ log.trace("Query took " + stopWatch.getTime() + " milliseconds");
- while (resultSet.next())
- {
- TT rowResult = mapper.processRow(resultSet, 0, null);
- result.add(rowResult);
- }
- long mappingEndTime = System.currentTimeMillis();
- log.trace("mapping took " + (mappingEndTime - queryEndTime)
- + " milliseconds");
- }
+ return StreamSupport.stream(spliterator, false).onClose(spliterator);
}
catch (SQLException e)
{
throw ExceptionMapper.getInstance().toTorqueException(e);
}
-
- return result;
}
/**
@@ -1057,6 +1077,48 @@ public class BasePeerImpl<T> implements
final Connection connection)
throws TorqueException
{
+ try (Stream<TT> resultStream = doSelectAsStream(criteria, mapper, connection))
+ {
+ List<TT> result = resultStream.collect(Collectors.toList());
+
+ if (criteria.isSingleRecord() && result.size() > 1)
+ {
+ throw new TooManyRowsException(
+ "Criteria expected single Record and "
+ + "Multiple Records were selected");
+ }
+
+ return result;
+ }
+ }
+
+ /**
+ * Performs a SQL <code>select</code> using a PreparedStatement.
+ * This method returns a stream that <b>must</b> be closed after use.
+ * All resources used by this method will be closed when the stream is
+ * closed.
+ *
+ * @param criteria A Criteria specifying the records to select, not null.
+ * @param mapper The mapper creating the objects from the resultSet,
+ * not null.
+ * @param connection the database connection for selecting records,
+ * not null.
+ *
+ * @return The results of the query as a Stream, not null.
+ *
+ * @throws TorqueException Error performing database query.
+ */
+ public <TT> Stream<TT> doSelectAsStream(
+ final Criteria criteria,
+ final RecordMapper<TT> mapper,
+ final Connection connection)
+ throws TorqueException
+ {
+ if (connection == null)
+ {
+ throw new NullPointerException("connection is null");
+ }
+
correctBooleans(criteria);
Query query = SqlBuilder.buildQuery(criteria);
@@ -1068,8 +1130,9 @@ public class BasePeerImpl<T> implements
query.getFromClause().add(new FromElement(tableName));
}
- try (PreparedStatement statement = connection.prepareStatement(query.toString()))
+ try
{
+ PreparedStatement statement = connection.prepareStatement(query.toString());
if (query.getFetchSize() != null)
{
statement.setFetchSize(query.getFetchSize());
@@ -1080,85 +1143,18 @@ public class BasePeerImpl<T> implements
query.getPreparedStatementReplacements(),
0);
- // Set offset and limit
- long offset;
- Database database = Torque.getDatabase(criteria.getDbName());
- if (database.getAdapter().supportsNativeOffset())
- {
- offset = 0; //database takes care of offset
- }
- else
- {
- offset = criteria.getOffset();
- }
-
- long limit;
- if (database.getAdapter().supportsNativeLimit())
- {
- limit = -1; //database takes care of offset
- }
- else
- {
- if (database.getAdapter().supportsNativeOffset())
- {
- limit = criteria.getLimit();
- }
- else
- {
- if (criteria.getLimit() == -1)
- {
- limit = criteria.getLimit();
- }
- else
- {
- limit = offset + criteria.getLimit();
- }
- }
- }
-
- long startTime = System.currentTimeMillis();
+ StopWatch stopWatch = new StopWatch();
log.debug("Executing query " + query
+ ", parameters = "
+ replacements);
- try (ResultSet resultSet = statement.executeQuery())
- {
- long queryEndTime = System.currentTimeMillis();
- log.trace("query took " + (queryEndTime - startTime)
- + " milliseconds");
-
- List<TT> result = new ArrayList<>();
- int rowNumber = 0;
- while (resultSet.next())
- {
- if (rowNumber < offset)
- {
- rowNumber++;
- continue;
- }
- if (limit >= 0 && rowNumber >= limit)
- {
- break;
- }
-
- TT rowResult = mapper.processRow(resultSet, 0, criteria);
- result.add(rowResult);
+ stopWatch.start();
+ ResultSet resultSet = statement.executeQuery();
+ ResultsetSpliterator<TT> spliterator =
+ new ResultsetSpliterator<>(mapper, criteria, statement, resultSet);
+ log.trace("Query took " + stopWatch.getTime() + " milliseconds");
- rowNumber++;
- }
- long mappingEndTime = System.currentTimeMillis();
- log.trace("mapping took " + (mappingEndTime - queryEndTime)
- + " milliseconds");
-
- if (criteria.isSingleRecord() && result.size() > 1)
- {
- throw new TooManyRowsException(
- "Criteria expected single Record and "
- + "Multiple Records were selected");
- }
-
- return result;
- }
+ return StreamSupport.stream(spliterator, false).onClose(spliterator);
}
catch (SQLException e)
{
@@ -1410,16 +1406,15 @@ public class BasePeerImpl<T> implements
preparedStatement,
query.getPreparedStatementReplacements(),
position - 1);
- long startTime = System.currentTimeMillis();
+ StopWatch stopWatch = new StopWatch();
log.debug("Executing update " + query.toString()
+ " using update parameters " + replacementObjects
+ " and query parameters "
+ replacements);
+ stopWatch.start();
int affectedRows = preparedStatement.executeUpdate();
- long queryEndTime = System.currentTimeMillis();
- log.trace("update took " + (queryEndTime - startTime)
- + " milliseconds");
+ log.trace("Update took " + stopWatch.getTime() + " milliseconds");
return affectedRows;
}
Added: db/torque/torque4/trunk/torque-runtime/src/main/java/org/apache/torque/util/ResultsetSpliterator.java
URL: http://svn.apache.org/viewvc/db/torque/torque4/trunk/torque-runtime/src/main/java/org/apache/torque/util/ResultsetSpliterator.java?rev=1848523&view=auto
==============================================================================
--- db/torque/torque4/trunk/torque-runtime/src/main/java/org/apache/torque/util/ResultsetSpliterator.java (added)
+++ db/torque/torque4/trunk/torque-runtime/src/main/java/org/apache/torque/util/ResultsetSpliterator.java Sun Dec 9 14:59:45 2018
@@ -0,0 +1,159 @@
+package org.apache.torque.util;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Spliterator;
+import java.util.Spliterators.AbstractSpliterator;
+import java.util.function.Consumer;
+
+import org.apache.torque.Database;
+import org.apache.torque.Torque;
+import org.apache.torque.TorqueException;
+import org.apache.torque.TorqueRuntimeException;
+import org.apache.torque.criteria.Criteria;
+import org.apache.torque.om.mapper.RecordMapper;
+
+/**
+ * Stream support: Encapsulate iteration over a JDBC ResultSet
+ *
+ * @author <a href="mailto:tv@apache.org">Thomas Vandahl</a>
+ */
+public class ResultsetSpliterator<T> extends AbstractSpliterator<T> implements Runnable
+{
+ private final RecordMapper<T> recordMapper;
+ private final Criteria criteria;
+ private final Statement statement;
+ private final ResultSet resultSet;
+
+ private long offset;
+ private long limit;
+ private long rowNumber;
+
+ /**
+ * Constructor
+ *
+ * @param recordMapper a RecordMapper to map ResultSet rows to entities of type
+ * <T>
+ * @param criteria a Criteria
+ * @param statement the statement that created the ResultSet
+ * @param resultSet the JDBC result set
+ * @throws TorqueException
+ */
+ public ResultsetSpliterator(RecordMapper<T> recordMapper, Criteria criteria,
+ Statement statement, ResultSet resultSet) throws TorqueException
+ {
+ super(Long.MAX_VALUE, Spliterator.ORDERED);
+
+ this.recordMapper = recordMapper;
+ this.criteria = criteria;
+ this.statement = statement;
+ this.resultSet = resultSet;
+ this.offset = 0; //database takes care of offset
+ this.limit = -1; //database takes care of limit
+ this.rowNumber = 0;
+
+ // Set offset and limit
+ if (criteria != null)
+ {
+ Database database = Torque.getDatabase(criteria.getDbName());
+ if (!database.getAdapter().supportsNativeOffset())
+ {
+ offset = criteria.getOffset();
+ }
+
+ if (!database.getAdapter().supportsNativeLimit())
+ {
+ if (database.getAdapter().supportsNativeOffset())
+ {
+ limit = criteria.getLimit();
+ }
+ else if (criteria.getLimit() != -1)
+ {
+ limit = offset + criteria.getLimit();
+ }
+ }
+ }
+ }
+
+ /**
+ * Advance ResultSet and map row to entity <T>
+ *
+ * @see java.util.Spliterator#tryAdvance(java.util.function.Consumer)
+ */
+ @Override
+ public boolean tryAdvance(Consumer<? super T> action)
+ {
+ try
+ {
+ while (resultSet.next())
+ {
+ if (rowNumber < offset)
+ {
+ rowNumber++;
+ continue;
+ }
+ if (limit >= 0 && rowNumber >= limit)
+ {
+ return false;
+ }
+
+ rowNumber++;
+ T result = recordMapper.processRow(resultSet, 0, criteria);
+ action.accept(result);
+ return true;
+ }
+ }
+ catch (SQLException e)
+ {
+ throw new TorqueRuntimeException(e);
+ }
+
+ return false;
+ }
+
+ /**
+ * Method to be run onClose() of associated stream
+ *
+ * @see java.lang.Runnable#run()
+ */
+ @Override
+ public void run()
+ {
+ try
+ {
+ resultSet.close();
+ }
+ catch (SQLException e)
+ {
+ throw new TorqueRuntimeException(e);
+ }
+ try
+ {
+ statement.close();
+ }
+ catch (SQLException e)
+ {
+ throw new TorqueRuntimeException(e);
+ }
+ }
+}
Propchange: db/torque/torque4/trunk/torque-runtime/src/main/java/org/apache/torque/util/ResultsetSpliterator.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
---------------------------------------------------------------------
To unsubscribe, e-mail: torque-dev-unsubscribe@db.apache.org
For additional commands, e-mail: torque-dev-help@db.apache.org