You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by jb...@apache.org on 2019/04/05 12:58:12 UTC
[lucene-solr] branch branch_8x updated: SOLR-13374: Add fetchSize
parameter to the jdbc Streaming Expression
This is an automated email from the ASF dual-hosted git repository.
jbernste pushed a commit to branch branch_8x
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git
The following commit(s) were added to refs/heads/branch_8x by this push:
new e875149 SOLR-13374: Add fetchSize parameter to the jdbc Streaming Expression
e875149 is described below
commit e87514909bcceb934f9a79150a720898e52d0558
Author: Joel Bernstein <jb...@apache.org>
AuthorDate: Fri Apr 5 08:52:02 2019 -0400
SOLR-13374: Add fetchSize parameter to the jdbc Streaming Expression
---
.../solr/client/solrj/io/stream/JDBCStream.java | 24 +++++++++++++++++-----
.../client/solrj/io/stream/JDBCStreamTest.java | 5 ++++-
2 files changed, 23 insertions(+), 6 deletions(-)
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/JDBCStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/JDBCStream.java
index 4081035..35d23eb 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/JDBCStream.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/JDBCStream.java
@@ -138,6 +138,7 @@ public class JDBCStream extends TupleStream implements Expressible {
private String connectionUrl;
private String sqlQuery;
private StreamComparator definedSort;
+ private int fetchSize;
// Internal
private Connection connection;
@@ -153,7 +154,7 @@ public class JDBCStream extends TupleStream implements Expressible {
}
public JDBCStream(String connectionUrl, String sqlQuery, StreamComparator definedSort, Properties connectionProperties, String driverClassName) throws IOException {
- init(connectionUrl, sqlQuery, definedSort, connectionProperties, driverClassName);
+ init(connectionUrl, sqlQuery, definedSort, connectionProperties, driverClassName, 5000);
}
public JDBCStream(StreamExpression expression, StreamFactory factory) throws IOException{
@@ -163,7 +164,9 @@ public class JDBCStream extends TupleStream implements Expressible {
StreamExpressionNamedParameter sqlQueryExpression = factory.getNamedOperand(expression, "sql");
StreamExpressionNamedParameter definedSortExpression = factory.getNamedOperand(expression, SORT);
StreamExpressionNamedParameter driverClassNameExpression = factory.getNamedOperand(expression, "driver");
-
+ StreamExpressionNamedParameter fetchSizeExpression = factory.getNamedOperand(expression, "fetchSize");
+
+
// Validate there are no unknown parameters - zkHost and alias are namedParameter so we don't need to count it twice
if(expression.getParameters().size() != namedParams.size()){
throw new IOException(String.format(Locale.ROOT,"invalid expression %s - unknown operands found", expression));
@@ -177,6 +180,12 @@ public class JDBCStream extends TupleStream implements Expressible {
}
}
+ int fetchSize = 5000;
+ if(null != fetchSizeExpression && fetchSizeExpression.getParameter() instanceof StreamExpressionValue){
+ String fetchSizeString = ((StreamExpressionValue)fetchSizeExpression.getParameter()).getValue();
+ fetchSize = Integer.parseInt(fetchSizeString);
+ }
+
// connectionUrl, required
String connectionUrl = null;
if(null != connectionUrlExpression && connectionUrlExpression.getParameter() instanceof StreamExpressionValue){
@@ -211,15 +220,16 @@ public class JDBCStream extends TupleStream implements Expressible {
}
// We've got all the required items
- init(connectionUrl, sqlQuery, definedSort, connectionProperties, driverClass);
+ init(connectionUrl, sqlQuery, definedSort, connectionProperties, driverClass, fetchSize);
}
- private void init(String connectionUrl, String sqlQuery, StreamComparator definedSort, Properties connectionProperties, String driverClassName) {
+ private void init(String connectionUrl, String sqlQuery, StreamComparator definedSort, Properties connectionProperties, String driverClassName, int fetchSize) {
this.connectionUrl = connectionUrl;
this.sqlQuery = sqlQuery;
this.definedSort = definedSort;
this.connectionProperties = connectionProperties;
this.driverClassName = driverClassName;
+ this.fetchSize = fetchSize;
}
public void setStreamContext(StreamContext context) {
@@ -267,6 +277,7 @@ public class JDBCStream extends TupleStream implements Expressible {
try{
resultSet = statement.executeQuery(sqlQuery);
+ resultSet.setFetchSize(fetchSize);
} catch (SQLException e) {
throw new IOException(String.format(Locale.ROOT, "Failed to execute sqlQuery '%s' against JDBC connection '%s'.\n"
+ e.getMessage(), sqlQuery, connectionUrl), e);
@@ -531,7 +542,10 @@ public class JDBCStream extends TupleStream implements Expressible {
// sql
expression.addParameter(new StreamExpressionNamedParameter("sql", sqlQuery));
-
+
+ // fetchSize
+ expression.addParameter(new StreamExpressionNamedParameter("fetchSize", Integer.toString(fetchSize)));
+
// sort
expression.addParameter(new StreamExpressionNamedParameter(SORT, definedSort.toExpression(factory)));
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/JDBCStreamTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/JDBCStreamTest.java
index a44de5b..d6ac88d 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/JDBCStreamTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/JDBCStreamTest.java
@@ -33,6 +33,7 @@ import org.apache.solr.client.solrj.io.SolrClientCache;
import org.apache.solr.client.solrj.io.Tuple;
import org.apache.solr.client.solrj.io.comp.ComparatorOrder;
import org.apache.solr.client.solrj.io.comp.FieldComparator;
+import org.apache.solr.client.solrj.io.stream.expr.Expressible;
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
import org.apache.solr.client.solrj.io.stream.metrics.CountMetric;
import org.apache.solr.client.solrj.io.stream.metrics.MaxMetric;
@@ -329,7 +330,7 @@ public class JDBCStreamTest extends SolrCloudTestCase {
+ " rating_f as rating"
+ " ),"
+ " select("
- + " jdbc(connection=\"jdbc:hsqldb:mem:.\", sql=\"select PEOPLE.ID, PEOPLE.NAME, COUNTRIES.COUNTRY_NAME from PEOPLE inner join COUNTRIES on PEOPLE.COUNTRY_CODE = COUNTRIES.CODE order by PEOPLE.ID\", sort=\"ID asc\"),"
+ + " jdbc(fetchSize=300, connection=\"jdbc:hsqldb:mem:.\", sql=\"select PEOPLE.ID, PEOPLE.NAME, COUNTRIES.COUNTRY_NAME from PEOPLE inner join COUNTRIES on PEOPLE.COUNTRY_CODE = COUNTRIES.CODE order by PEOPLE.ID\", sort=\"ID asc\"),"
+ " ID as personId,"
+ " NAME as personName,"
+ " COUNTRY_NAME as country"
@@ -339,6 +340,8 @@ public class JDBCStreamTest extends SolrCloudTestCase {
stream = factory.constructStream(expression);
+ String expr = ((Expressible)stream).toExpression(factory).toString();
+ assertTrue(expr.contains("fetchSize=300"));
stream.setStreamContext(streamContext);
tuples = getTuples(stream);