You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ab...@apache.org on 2020/05/14 16:47:23 UTC

[lucene-solr] branch jira/solr-14470 updated: SOLR-14470: Cannot re-open streams once they are closed, need to re-create them.

This is an automated email from the ASF dual-hosted git repository.

ab pushed a commit to branch jira/solr-14470
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git


The following commit(s) were added to refs/heads/jira/solr-14470 by this push:
     new f1765f1  SOLR-14470: Cannot re-open streams once they are closed, need to re-create them.
f1765f1 is described below

commit f1765f11738ab2ccb97e75fbfea1e9f53e12d290
Author: Andrzej Bialecki <ab...@apache.org>
AuthorDate: Thu May 14 18:46:50 2020 +0200

    SOLR-14470: Cannot re-open streams once they are closed, need to re-create them.
---
 .../apache/solr/handler/export/ExportWriter.java   | 30 ++++++++++++++--------
 .../solr/handler/export/TestExportWriter.java      | 21 ++++++++++++++-
 2 files changed, 39 insertions(+), 12 deletions(-)

diff --git a/solr/core/src/java/org/apache/solr/handler/export/ExportWriter.java b/solr/core/src/java/org/apache/solr/handler/export/ExportWriter.java
index b386136..c0449cb 100644
--- a/solr/core/src/java/org/apache/solr/handler/export/ExportWriter.java
+++ b/solr/core/src/java/org/apache/solr/handler/export/ExportWriter.java
@@ -110,8 +110,8 @@ public class ExportWriter implements SolrCore.RawWriter, Closeable {
   final SolrQueryRequest req;
   final SolrQueryResponse res;
   final StreamContext initialStreamContext;
+  StreamExpression streamExpression;
   StreamContext streamContext;
-  TupleStream tupleStream;
   FieldWriter[] fieldWriters;
   int totalHits = 0;
   FixedBitSet[] sets = null;
@@ -339,13 +339,12 @@ public class ExportWriter implements SolrCore.RawWriter, Closeable {
     if (expr != null) {
       StreamFactory streamFactory = initialStreamContext.getStreamFactory();
       try {
-        StreamExpression streamExpression = StreamExpressionParser.parse(expr);
-        if (streamFactory.isEvaluator(streamExpression)) {
-          StreamExpression tupleExpression = new StreamExpression(StreamParams.TUPLE);
-          tupleExpression.addParameter(new StreamExpressionNamedParameter(StreamParams.RETURN_VALUE, streamExpression));
-          tupleStream = streamFactory.constructStream(tupleExpression);
+        StreamExpression expression = StreamExpressionParser.parse(expr);
+        if (streamFactory.isEvaluator(expression)) {
+          streamExpression = new StreamExpression(StreamParams.TUPLE);
+          streamExpression.addParameter(new StreamExpressionNamedParameter(StreamParams.RETURN_VALUE, expression));
         } else {
-          tupleStream = streamFactory.constructStream(streamExpression);
+          streamExpression = expression;
         }
       } catch (Exception e) {
         writeException(e, writer, true);
@@ -363,7 +362,6 @@ public class ExportWriter implements SolrCore.RawWriter, Closeable {
       streamContext.setObjectCache(initialStreamContext.getObjectCache());
       streamContext.put("core", req.getCore().getName());
       streamContext.put("solr-core", req.getCore());
-      tupleStream.setStreamContext(streamContext);
     }
 
     writer.writeMap(m -> {
@@ -373,12 +371,18 @@ public class ExportWriter implements SolrCore.RawWriter, Closeable {
         mw.put("docs", (IteratorWriter) iw -> writeDocs(req, iw, sort));
       });
     });
-    if (tupleStream != null) {
-      tupleStream.close();
+    if (streamContext != null) {
       streamContext = null;
     }
   }
 
+  private TupleStream createTupleStream() throws Exception {
+    StreamFactory streamFactory = initialStreamContext.getStreamFactory();
+    TupleStream tupleStream = streamFactory.constructStream(streamExpression);
+    tupleStream.setStreamContext(streamContext);
+    return tupleStream;
+  }
+
   protected void identifyLowestSortingUnexportedDocs(List<LeafReaderContext> leaves, SortDoc sortDoc, SortQueue queue) throws IOException {
     queue.reset();
     SortDoc top = queue.top();
@@ -410,14 +414,18 @@ public class ExportWriter implements SolrCore.RawWriter, Closeable {
 
   protected void addDocsToItemWriter(List<LeafReaderContext> leaves, IteratorWriter.ItemWriter writer, SortDoc[] docsToExport, int outDocsIndex) throws IOException {
     try {
-      if (tupleStream != null) {
+      if (streamExpression != null) {
         streamContext.put(DOCS_KEY, docsToExport);
         streamContext.put(DOCS_INDEX_KEY, new int[] {outDocsIndex});
         streamContext.put(EXPORT_WRITER_KEY, this);
         streamContext.put(LEAF_READERS_KEY, leaves);
+        TupleStream tupleStream = createTupleStream();
         tupleStream.open();
         for (;;) {
           final Tuple t = tupleStream.read();
+          if (t == null) {
+            break;
+          }
           if (t.EOF) {
             break;
           }
diff --git a/solr/core/src/test/org/apache/solr/handler/export/TestExportWriter.java b/solr/core/src/test/org/apache/solr/handler/export/TestExportWriter.java
index 843e6ca..ff0ff96 100644
--- a/solr/core/src/test/org/apache/solr/handler/export/TestExportWriter.java
+++ b/solr/core/src/test/org/apache/solr/handler/export/TestExportWriter.java
@@ -706,11 +706,30 @@ public class TestExportWriter extends SolrTestCaseJ4 {
     validateSort(numDocs);
   }
 
+  private void createLargeIndex() throws Exception {
+    int BATCH_SIZE = 1000;
+    int NUM_BATCHES = 100;
+    SolrInputDocument[] docs = new SolrInputDocument[BATCH_SIZE];
+    for (int i = 0; i < NUM_BATCHES; i++) {
+      for (int j = 0; j < BATCH_SIZE; j++) {
+        docs[j] = new SolrInputDocument(
+            "id", String.valueOf(i * BATCH_SIZE + j),
+            "batch_i_p", String.valueOf(i),
+            "random_i_p", String.valueOf(random().nextInt(BATCH_SIZE)),
+            "sortabledv", TestUtil.randomSimpleString(random(), 3, 5),
+            "small_i_p", String.valueOf((i + j) % 7)
+            );
+      }
+      updateJ(jsonAdd(docs), null);
+    }
+    assertU(commit());
+  }
+
   @Test
   public void testExpr() throws Exception {
     assertU(delQ("*:*"));
     assertU(commit());
-    createIndex();
+    createLargeIndex();
     String resp = h.query(req("q", "*:*", "qt", "/export", "fl", "id", "sort", "id asc", "expr", "top(n=2,input(),sort=\"id desc\")"));
     assertNotNull(resp);
   }