You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by jb...@apache.org on 2019/04/05 12:58:12 UTC

[lucene-solr] branch branch_8x updated: SOLR-13374: Add fetchSize parameter to the jdbc Streaming Expression

This is an automated email from the ASF dual-hosted git repository.

jbernste pushed a commit to branch branch_8x
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git


The following commit(s) were added to refs/heads/branch_8x by this push:
     new e875149  SOLR-13374: Add fetchSize parameter to the jdbc Streaming Expression
e875149 is described below

commit e87514909bcceb934f9a79150a720898e52d0558
Author: Joel Bernstein <jb...@apache.org>
AuthorDate: Fri Apr 5 08:52:02 2019 -0400

    SOLR-13374: Add fetchSize parameter to the jdbc Streaming Expression
---
 .../solr/client/solrj/io/stream/JDBCStream.java    | 24 +++++++++++++++++-----
 .../client/solrj/io/stream/JDBCStreamTest.java     |  5 ++++-
 2 files changed, 23 insertions(+), 6 deletions(-)

diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/JDBCStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/JDBCStream.java
index 4081035..35d23eb 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/JDBCStream.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/JDBCStream.java
@@ -138,6 +138,7 @@ public class JDBCStream extends TupleStream implements Expressible {
   private String connectionUrl;
   private String sqlQuery;
   private StreamComparator definedSort;
+  private int fetchSize;
   
   // Internal
   private Connection connection;
@@ -153,7 +154,7 @@ public class JDBCStream extends TupleStream implements Expressible {
   }
   
   public JDBCStream(String connectionUrl, String sqlQuery, StreamComparator definedSort, Properties connectionProperties, String driverClassName) throws IOException {
-    init(connectionUrl, sqlQuery, definedSort, connectionProperties, driverClassName);
+    init(connectionUrl, sqlQuery, definedSort, connectionProperties, driverClassName, 5000);
   }
   
   public JDBCStream(StreamExpression expression, StreamFactory factory) throws IOException{
@@ -163,7 +164,9 @@ public class JDBCStream extends TupleStream implements Expressible {
     StreamExpressionNamedParameter sqlQueryExpression = factory.getNamedOperand(expression, "sql");
     StreamExpressionNamedParameter definedSortExpression = factory.getNamedOperand(expression, SORT);
     StreamExpressionNamedParameter driverClassNameExpression = factory.getNamedOperand(expression, "driver");
-    
+    StreamExpressionNamedParameter fetchSizeExpression = factory.getNamedOperand(expression, "fetchSize");
+
+
     // Validate there are no unknown parameters - zkHost and alias are namedParameter so we don't need to count it twice
     if(expression.getParameters().size() != namedParams.size()){
       throw new IOException(String.format(Locale.ROOT,"invalid expression %s - unknown operands found", expression));
@@ -177,6 +180,12 @@ public class JDBCStream extends TupleStream implements Expressible {
       }
     }
 
+    int fetchSize = 5000;
+    if(null != fetchSizeExpression && fetchSizeExpression.getParameter() instanceof StreamExpressionValue){
+      String fetchSizeString = ((StreamExpressionValue)fetchSizeExpression.getParameter()).getValue();
+      fetchSize = Integer.parseInt(fetchSizeString);
+    }
+
     // connectionUrl, required
     String connectionUrl = null;
     if(null != connectionUrlExpression && connectionUrlExpression.getParameter() instanceof StreamExpressionValue){
@@ -211,15 +220,16 @@ public class JDBCStream extends TupleStream implements Expressible {
     }
 
     // We've got all the required items
-    init(connectionUrl, sqlQuery, definedSort, connectionProperties, driverClass);
+    init(connectionUrl, sqlQuery, definedSort, connectionProperties, driverClass, fetchSize);
   }
     
-  private void init(String connectionUrl, String sqlQuery, StreamComparator definedSort, Properties connectionProperties, String driverClassName) {
+  private void init(String connectionUrl, String sqlQuery, StreamComparator definedSort, Properties connectionProperties, String driverClassName, int fetchSize) {
     this.connectionUrl = connectionUrl;
     this.sqlQuery = sqlQuery;
     this.definedSort = definedSort;
     this.connectionProperties = connectionProperties;
     this.driverClassName = driverClassName;
+    this.fetchSize = fetchSize;
   }
   
   public void setStreamContext(StreamContext context) {
@@ -267,6 +277,7 @@ public class JDBCStream extends TupleStream implements Expressible {
     
     try{
       resultSet = statement.executeQuery(sqlQuery);
+      resultSet.setFetchSize(fetchSize);
     } catch (SQLException e) {
       throw new IOException(String.format(Locale.ROOT, "Failed to execute sqlQuery '%s' against JDBC connection '%s'.\n"
           + e.getMessage(), sqlQuery, connectionUrl), e);
@@ -531,7 +542,10 @@ public class JDBCStream extends TupleStream implements Expressible {
     
     // sql
     expression.addParameter(new StreamExpressionNamedParameter("sql", sqlQuery));
-    
+
+    // fetchSize
+    expression.addParameter(new StreamExpressionNamedParameter("fetchSize", Integer.toString(fetchSize)));
+
     // sort
     expression.addParameter(new StreamExpressionNamedParameter(SORT, definedSort.toExpression(factory)));
     
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/JDBCStreamTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/JDBCStreamTest.java
index a44de5b..d6ac88d 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/JDBCStreamTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/JDBCStreamTest.java
@@ -33,6 +33,7 @@ import org.apache.solr.client.solrj.io.SolrClientCache;
 import org.apache.solr.client.solrj.io.Tuple;
 import org.apache.solr.client.solrj.io.comp.ComparatorOrder;
 import org.apache.solr.client.solrj.io.comp.FieldComparator;
+import org.apache.solr.client.solrj.io.stream.expr.Expressible;
 import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
 import org.apache.solr.client.solrj.io.stream.metrics.CountMetric;
 import org.apache.solr.client.solrj.io.stream.metrics.MaxMetric;
@@ -329,7 +330,7 @@ public class JDBCStreamTest extends SolrCloudTestCase {
               + "    rating_f as rating"
               + "  ),"
               + "  select("
-              + "    jdbc(connection=\"jdbc:hsqldb:mem:.\", sql=\"select PEOPLE.ID, PEOPLE.NAME, COUNTRIES.COUNTRY_NAME from PEOPLE inner join COUNTRIES on PEOPLE.COUNTRY_CODE = COUNTRIES.CODE order by PEOPLE.ID\", sort=\"ID asc\"),"
+              + "    jdbc(fetchSize=300, connection=\"jdbc:hsqldb:mem:.\", sql=\"select PEOPLE.ID, PEOPLE.NAME, COUNTRIES.COUNTRY_NAME from PEOPLE inner join COUNTRIES on PEOPLE.COUNTRY_CODE = COUNTRIES.CODE order by PEOPLE.ID\", sort=\"ID asc\"),"
               + "    ID as personId,"
               + "    NAME as personName,"
               + "    COUNTRY_NAME as country"
@@ -339,6 +340,8 @@ public class JDBCStreamTest extends SolrCloudTestCase {
 
 
       stream = factory.constructStream(expression);
+      String expr = ((Expressible)stream).toExpression(factory).toString();
+      assertTrue(expr.contains("fetchSize=300"));
       stream.setStreamContext(streamContext);
       tuples = getTuples(stream);