You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ab...@apache.org on 2017/01/02 10:18:34 UTC

[13/19] lucene-solr:jira/solr-9854: SOLR-9668: introduce cursorMark='true' for SolrEntityProcessor

SOLR-9668: introduce cursorMark='true' for SolrEntityProcessor


Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/cc862d8e
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/cc862d8e
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/cc862d8e

Branch: refs/heads/jira/solr-9854
Commit: cc862d8e67f32d5447599d265f5d126541ed92c9
Parents: 26ee8e9
Author: Mikhail Khludnev <mk...@apache.org>
Authored: Tue Dec 27 15:34:12 2016 +0300
Committer: Mikhail Khludnev <mk...@apache.org>
Committed: Fri Dec 30 20:50:53 2016 +0300

----------------------------------------------------------------------
 solr/CHANGES.txt                                |   2 +
 .../handler/dataimport/SolrEntityProcessor.java | 192 ++++++++++++-------
 .../dataimport/MockSolrEntityProcessor.java     |  18 +-
 .../TestSolrEntityProcessorEndToEnd.java        |  27 ++-
 .../dataimport/TestSolrEntityProcessorUnit.java |  70 +++++++
 5 files changed, 234 insertions(+), 75 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/cc862d8e/solr/CHANGES.txt
----------------------------------------------------------------------
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index 59dde90..874ac81 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -206,6 +206,8 @@ New Features
 
 * SOLR-9891: Add mkroot command to bin/solr and bin/solr.cmd (Erick Erickson)
 
+* SOLR-9668,SOLR-7197: introduce cursorMark='true' in SolrEntityProcessor (Yegor Kozlov, Raveendra Yerraguntl via Mikhail Khludnev)
+
 Optimizations
 ----------------------
 * SOLR-9704: Facet Module / JSON Facet API: Optimize blockChildren facets that have

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/cc862d8e/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/SolrEntityProcessor.java
----------------------------------------------------------------------
diff --git a/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/SolrEntityProcessor.java b/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/SolrEntityProcessor.java
index 5e62731..6d8726f 100644
--- a/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/SolrEntityProcessor.java
+++ b/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/SolrEntityProcessor.java
@@ -16,6 +16,18 @@
  */
 package org.apache.solr.handler.dataimport;
 
+import static org.apache.solr.handler.dataimport.DataImportHandlerException.SEVERE;
+import static org.apache.solr.handler.dataimport.DataImportHandlerException.wrapAndThrow;
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
 import org.apache.http.client.HttpClient;
 import org.apache.solr.client.solrj.SolrClient;
 import org.apache.solr.client.solrj.SolrQuery;
@@ -27,22 +39,12 @@ import org.apache.solr.client.solrj.impl.XMLResponseParser;
 import org.apache.solr.client.solrj.response.QueryResponse;
 import org.apache.solr.common.SolrDocument;
 import org.apache.solr.common.SolrDocumentList;
+import org.apache.solr.common.SolrException;
 import org.apache.solr.common.params.CommonParams;
+import org.apache.solr.common.params.CursorMarkParams;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-import java.lang.invoke.MethodHandles;
-import java.net.MalformedURLException;
-import java.net.URL;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-
-import static org.apache.solr.handler.dataimport.DataImportHandlerException.SEVERE;
-import static org.apache.solr.handler.dataimport.DataImportHandlerException.wrapAndThrow;
-
 /**
  * <p>
  * An implementation of {@link EntityProcessor} which fetches values from a
@@ -139,81 +141,53 @@ public class SolrEntityProcessor extends EntityProcessorBase {
    * The following method changes the rowIterator mutable field. It requires
    * external synchronization. 
    */
-  private void buildIterator() {
+  protected void buildIterator() {
     if (rowIterator != null)  {
       SolrDocumentListIterator documentListIterator = (SolrDocumentListIterator) rowIterator;
       if (!documentListIterator.hasNext() && documentListIterator.hasMoreRows()) {
-        SolrDocumentList solrDocumentList = doQuery(documentListIterator
-            .getStart() + documentListIterator.getSize());
-        if (solrDocumentList != null) {
-          rowIterator = new SolrDocumentListIterator(solrDocumentList);
-        }
+        nextPage();
       }
-    } else  {
-      SolrDocumentList solrDocumentList = doQuery(0);
-      if (solrDocumentList != null) {
-        rowIterator = new SolrDocumentListIterator(solrDocumentList);
-      }
-      return;
+    } else {
+      Boolean cursor = new Boolean(context
+          .getResolvedEntityAttribute(CursorMarkParams.CURSOR_MARK_PARAM));
+      rowIterator = !cursor ? new SolrDocumentListIterator(new SolrDocumentList())
+          : new SolrDocumentListCursor(new SolrDocumentList(), CursorMarkParams.CURSOR_MARK_START);
+      nextPage();
     }
   }
   
-  protected SolrDocumentList doQuery(int start) {
-    this.queryString = context.getResolvedEntityAttribute(QUERY);
-    if (this.queryString == null) {
-      throw new DataImportHandlerException(
-          DataImportHandlerException.SEVERE,
-          "SolrEntityProcessor: parameter 'query' is required"
-      );
-    }
-
-    String rowsP = context.getResolvedEntityAttribute(CommonParams.ROWS);
-    if (rowsP != null) {
-      rows = Integer.parseInt(rowsP);
-    }
+  protected void nextPage() {
+    ((SolrDocumentListIterator)rowIterator).doQuery();
+  }
 
-    String fqAsString = context.getResolvedEntityAttribute(CommonParams.FQ);
-    if (fqAsString != null) {
-      this.filterQueries = fqAsString.split(",");
-    }
+  class SolrDocumentListCursor extends SolrDocumentListIterator {
+    
+    private final String cursorMark;
 
-    String fieldsAsString = context.getResolvedEntityAttribute(CommonParams.FL);
-    if (fieldsAsString != null) {
-      this.fields = fieldsAsString.split(",");
-    }
-    this.requestHandler = context.getResolvedEntityAttribute(CommonParams.QT);
-    String timeoutAsString = context.getResolvedEntityAttribute(TIMEOUT);
-    if (timeoutAsString != null) {
-      this.timeout = Integer.parseInt(timeoutAsString);
+    public SolrDocumentListCursor(SolrDocumentList solrDocumentList, String cursorMark) {
+      super(solrDocumentList);
+      this.cursorMark = cursorMark;
     }
 
-    SolrQuery solrQuery = new SolrQuery(queryString);
-    solrQuery.setRows(rows);
-    solrQuery.setStart(start);
-    if (fields != null) {
-      for (String field : fields) {
-        solrQuery.addField(field);
+    @Override
+    protected void passNextPage(SolrQuery solrQuery) {
+      String timeoutAsString = context.getResolvedEntityAttribute(TIMEOUT);
+      if (timeoutAsString != null) {
+        throw new DataImportHandlerException(SEVERE,"cursorMark can't be used with timeout");
       }
+      
+      solrQuery.set(CursorMarkParams.CURSOR_MARK_PARAM, cursorMark);
     }
-    solrQuery.setRequestHandler(requestHandler);
-    solrQuery.setFilterQueries(filterQueries);
-    solrQuery.setTimeAllowed(timeout * 1000);
     
-    QueryResponse response = null;
-    try {
-      response = solrClient.query(solrQuery);
-    } catch (SolrServerException | IOException e) {
-      if (ABORT.equals(onError)) {
-        wrapAndThrow(SEVERE, e);
-      } else if (SKIP.equals(onError)) {
-        wrapAndThrow(DataImportHandlerException.SKIP_ROW, e);
-      }
+    @Override
+    protected Iterator<Map<String,Object>> createNextPageIterator(QueryResponse response) {
+      return
+          new SolrDocumentListCursor(response.getResults(),
+              response.getNextCursorMark()) ;
     }
-    
-    return response == null ? null : response.getResults();
   }
   
-  private static class SolrDocumentListIterator implements Iterator<Map<String,Object>> {
+  class SolrDocumentListIterator implements Iterator<Map<String,Object>> {
     
     private final int start;
     private final int size;
@@ -230,6 +204,84 @@ public class SolrEntityProcessor extends EntityProcessorBase {
       this.size = solrDocumentList.size();
     }
 
+    protected QueryResponse doQuery() {
+      SolrEntityProcessor.this.queryString = context.getResolvedEntityAttribute(QUERY);
+      if (SolrEntityProcessor.this.queryString == null) {
+        throw new DataImportHandlerException(
+            DataImportHandlerException.SEVERE,
+            "SolrEntityProcessor: parameter 'query' is required"
+        );
+      }
+
+      String rowsP = context.getResolvedEntityAttribute(CommonParams.ROWS);
+      if (rowsP != null) {
+        rows = Integer.parseInt(rowsP);
+      }
+
+      String sortParam = context.getResolvedEntityAttribute(CommonParams.SORT);
+      
+      String fqAsString = context.getResolvedEntityAttribute(CommonParams.FQ);
+      if (fqAsString != null) {
+        SolrEntityProcessor.this.filterQueries = fqAsString.split(",");
+      }
+
+      String fieldsAsString = context.getResolvedEntityAttribute(CommonParams.FL);
+      if (fieldsAsString != null) {
+        SolrEntityProcessor.this.fields = fieldsAsString.split(",");
+      }
+      SolrEntityProcessor.this.requestHandler = context.getResolvedEntityAttribute(CommonParams.QT);
+     
+
+      SolrQuery solrQuery = new SolrQuery(queryString);
+      solrQuery.setRows(rows);
+      
+      if (sortParam!=null) {
+        solrQuery.setParam(CommonParams.SORT, sortParam);
+      }
+      
+      passNextPage(solrQuery);
+      
+      if (fields != null) {
+        for (String field : fields) {
+          solrQuery.addField(field);
+        }
+      }
+      solrQuery.setRequestHandler(requestHandler);
+      solrQuery.setFilterQueries(filterQueries);
+      
+      
+      QueryResponse response = null;
+      try {
+        response = solrClient.query(solrQuery);
+      } catch (SolrServerException | IOException | SolrException e) {
+        if (ABORT.equals(onError)) {
+          wrapAndThrow(SEVERE, e);
+        } else if (SKIP.equals(onError)) {
+          wrapAndThrow(DataImportHandlerException.SKIP_ROW, e);
+        }
+      }
+      
+      if (response != null) {
+        SolrEntityProcessor.this.rowIterator = createNextPageIterator(response);
+      }
+      return response;
+    }
+
+    protected Iterator<Map<String,Object>> createNextPageIterator(QueryResponse response) {
+      return new SolrDocumentListIterator(response.getResults());
+    }
+
+    protected void passNextPage(SolrQuery solrQuery) {
+      String timeoutAsString = context.getResolvedEntityAttribute(TIMEOUT);
+      if (timeoutAsString != null) {
+        SolrEntityProcessor.this.timeout = Integer.parseInt(timeoutAsString);
+      }
+      
+      solrQuery.setTimeAllowed(timeout * 1000);
+      
+      solrQuery.setStart(getStart() + getSize());
+    }
+    
     @Override
     public boolean hasNext() {
       return solrDocumentIterator.hasNext();

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/cc862d8e/solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/dataimport/MockSolrEntityProcessor.java
----------------------------------------------------------------------
diff --git a/solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/dataimport/MockSolrEntityProcessor.java b/solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/dataimport/MockSolrEntityProcessor.java
index 4ebca30..42e5f7d 100644
--- a/solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/dataimport/MockSolrEntityProcessor.java
+++ b/solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/dataimport/MockSolrEntityProcessor.java
@@ -29,16 +29,28 @@ public class MockSolrEntityProcessor extends SolrEntityProcessor {
   private int queryCount = 0;
 
   private int rows;
+  
+  private int start = 0;
 
   public MockSolrEntityProcessor(List<SolrTestCaseJ4.Doc> docsData, int rows) {
     this.docsData = docsData;
     this.rows = rows;
   }
 
+  //@Override
+  //protected SolrDocumentList doQuery(int start) {
+  //  queryCount++;
+  //  return getDocs(start, rows);
+ // }
+  
   @Override
-  protected SolrDocumentList doQuery(int start) {
-    queryCount++;
-    return getDocs(start, rows);
+  protected void buildIterator() {
+    if (rowIterator==null || (!rowIterator.hasNext() && ((SolrDocumentListIterator)rowIterator).hasMoreRows())){
+      queryCount++;
+      SolrDocumentList docs = getDocs(start, rows);
+      rowIterator = new SolrDocumentListIterator(docs);
+      start += docs.size();
+    }
   }
 
   private SolrDocumentList getDocs(int start, int rows) {

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/cc862d8e/solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/dataimport/TestSolrEntityProcessorEndToEnd.java
----------------------------------------------------------------------
diff --git a/solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/dataimport/TestSolrEntityProcessorEndToEnd.java b/solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/dataimport/TestSolrEntityProcessorEndToEnd.java
index 8ef94c0..9e104ee 100644
--- a/solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/dataimport/TestSolrEntityProcessorEndToEnd.java
+++ b/solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/dataimport/TestSolrEntityProcessorEndToEnd.java
@@ -34,6 +34,8 @@ import java.io.IOException;
 import java.lang.invoke.MethodHandles;
 import java.nio.file.Files;
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -179,7 +181,7 @@ public class TestSolrEntityProcessorEndToEnd extends AbstractDataImportHandlerTe
     
     try {
       addDocumentsToSolr(generateSolrDocuments(7));
-      runFullImport(generateDIHConfig("query='*:*' fl='id' rows='2'", false));
+      runFullImport(generateDIHConfig("query='*:*' fl='id' rows='2'"+(random().nextBoolean() ?" cursorMark='true' sort='id asc'":""), false));
     } catch (Exception e) {
       LOG.error(e.getMessage(), e);
       fail(e.getMessage());
@@ -252,7 +254,8 @@ public class TestSolrEntityProcessorEndToEnd extends AbstractDataImportHandlerTe
     assertQ(req("*:*"), "//result[@numFound='0']");
     
     try {
-      runFullImport(generateDIHConfig("query='bogus:3' rows='2' fl='id,desc' onError='abort'", false));
+      runFullImport(generateDIHConfig("query='bogus:3' rows='2' fl='id,desc' onError='"+
+            (random().nextBoolean() ? "abort" : "justtogetcoverage")+"'", false));
     } catch (Exception e) {
       LOG.error(e.getMessage(), e);
       fail(e.getMessage());
@@ -260,7 +263,27 @@ public class TestSolrEntityProcessorEndToEnd extends AbstractDataImportHandlerTe
     
     assertQ(req("*:*"), "//result[@numFound='0']");
   }
+  
+  public void testCursorMarkNoSort() throws SolrServerException, IOException {
+    assertQ(req("*:*"), "//result[@numFound='0']");
+    addDocumentsToSolr(generateSolrDocuments(7));
+    try {     
+      List<String> errors = Arrays.asList("sort='id'", //wrong sort spec
+          "", //no sort spec
+          "sort='id asc' timeout='12345'"); // sort is fine, but set timeout
+      Collections.shuffle(errors, random());
+      String attrs = "query='*:*' rows='2' fl='id,desc' cursorMark='true' "
+                                                            + errors.get(0);
+      runFullImport(generateDIHConfig(attrs,
+            false));
+    } catch (Exception e) {
+      LOG.error(e.getMessage(), e);
+      fail(e.getMessage());
+    }
     
+    assertQ(req("*:*"), "//result[@numFound='0']");
+  }
+  
   private static List<Map<String,Object>> generateSolrDocuments(int num) {
     List<Map<String,Object>> docList = new ArrayList<>();
     for (int i = 1; i <= num; i++) {

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/cc862d8e/solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/dataimport/TestSolrEntityProcessorUnit.java
----------------------------------------------------------------------
diff --git a/solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/dataimport/TestSolrEntityProcessorUnit.java b/solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/dataimport/TestSolrEntityProcessorUnit.java
index a8fcbb1..a2a9fff 100644
--- a/solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/dataimport/TestSolrEntityProcessorUnit.java
+++ b/solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/dataimport/TestSolrEntityProcessorUnit.java
@@ -18,11 +18,23 @@ package org.apache.solr.handler.dataimport;
 
 import java.util.*;
 
+import org.apache.solr.client.solrj.SolrQuery;
+import org.apache.solr.common.params.CommonParams;
+import org.apache.solr.common.params.CursorMarkParams;
+import org.apache.solr.handler.dataimport.SolrEntityProcessor.SolrDocumentListIterator;
+import org.junit.Test;
+
 /**
  * Unit test of SolrEntityProcessor. A very basic test outside of the DIH.
  */
 public class TestSolrEntityProcessorUnit extends AbstractDataImportHandlerTestCase {
 
+  private static final class NoNextMockProcessor extends SolrEntityProcessor {
+    @Override
+    protected void nextPage() {
+    }
+  }
+
   private static final String ID = "id";
 
   public void testQuery() {
@@ -85,6 +97,64 @@ public class TestSolrEntityProcessorUnit extends AbstractDataImportHandlerTestCa
       processor.destroy();
     }
   }
+  @Test (expected = DataImportHandlerException.class)
+  public void testNoQuery() {
+    SolrEntityProcessor processor = new SolrEntityProcessor();
+    
+    HashMap<String,String> entityAttrs = new HashMap<String,String>(){{put(SolrEntityProcessor.SOLR_SERVER,"http://route:66/no");}};
+    processor.init(getContext(null, null, null, null, Collections.emptyList(), 
+        entityAttrs));
+    try {
+    processor.buildIterator();
+    }finally {
+      processor.destroy();
+    }
+  }
+  
+  public void testPagingQuery() {
+    SolrEntityProcessor processor = new NoNextMockProcessor() ;
+    
+    HashMap<String,String> entityAttrs = new HashMap<String,String>(){{
+      put(SolrEntityProcessor.SOLR_SERVER,"http://route:66/no");
+      if (random().nextBoolean()) {
+        List<String> noCursor = Arrays.asList("","false",CursorMarkParams.CURSOR_MARK_START);//only 'true' not '*'
+        Collections.shuffle(noCursor, random());
+        put(CursorMarkParams.CURSOR_MARK_PARAM,  noCursor.get(0));
+      }}};
+    processor.init(getContext(null, null, null, null, Collections.emptyList(), 
+        entityAttrs));
+    try {
+    processor.buildIterator();
+    SolrQuery query = new SolrQuery();
+    ((SolrDocumentListIterator) processor.rowIterator).passNextPage(query);
+    assertEquals("0", query.get(CommonParams.START));
+    assertNull( query.get(CursorMarkParams.CURSOR_MARK_PARAM));
+    assertNotNull( query.get(CommonParams.TIME_ALLOWED));
+    }finally {
+      processor.destroy();
+    }
+  }
+  
+  public void testCursorQuery() {
+    SolrEntityProcessor processor = new NoNextMockProcessor() ;
+    
+    HashMap<String,String> entityAttrs = new HashMap<String,String>(){{
+      put(SolrEntityProcessor.SOLR_SERVER,"http://route:66/no");
+      put(CursorMarkParams.CURSOR_MARK_PARAM,"true");
+      }};
+    processor.init(getContext(null, null, null, null, Collections.emptyList(), 
+        entityAttrs));
+    try {
+    processor.buildIterator();
+    SolrQuery query = new SolrQuery();
+    ((SolrDocumentListIterator) processor.rowIterator).passNextPage(query);
+    assertNull(query.get(CommonParams.START));
+    assertEquals(CursorMarkParams.CURSOR_MARK_START, query.get(CursorMarkParams.CURSOR_MARK_PARAM));
+    assertNull( query.get(CommonParams.TIME_ALLOWED));
+    }finally {
+      processor.destroy();
+    }
+  }
 
   private List<Doc> generateUniqueDocs(int numDocs) {
     List<FldType> types = new ArrayList<>();