You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by sh...@apache.org on 2011/11/22 13:14:04 UTC

svn commit: r1204946 - in /lucene/dev/branches/branch_3x: ./ lucene/ lucene/backwards/src/test-framework/ lucene/backwards/src/test/ solr/ solr/core/src/java/org/apache/solr/handler/ solr/solrj/ solr/solrj/src/java/org/apache/solr/client/solrj/request/...

Author: shalin
Date: Tue Nov 22 12:14:01 2011
New Revision: 1204946

URL: http://svn.apache.org/viewvc?rev=1204946&view=rev
Log:
SOLR-2904 -- BinaryUpdateRequestHandler should be able to accept multiple update requests from a stream

Modified:
    lucene/dev/branches/branch_3x/   (props changed)
    lucene/dev/branches/branch_3x/lucene/   (props changed)
    lucene/dev/branches/branch_3x/lucene/backwards/src/test/   (props changed)
    lucene/dev/branches/branch_3x/lucene/backwards/src/test-framework/   (props changed)
    lucene/dev/branches/branch_3x/solr/   (props changed)
    lucene/dev/branches/branch_3x/solr/CHANGES.txt
    lucene/dev/branches/branch_3x/solr/core/src/java/org/apache/solr/handler/BinaryUpdateRequestHandler.java
    lucene/dev/branches/branch_3x/solr/solrj/   (props changed)
    lucene/dev/branches/branch_3x/solr/solrj/src/java/org/apache/solr/client/solrj/request/JavaBinUpdateRequestCodec.java
    lucene/dev/branches/branch_3x/solr/solrj/src/test/org/apache/solr/client/solrj/request/TestUpdateRequestCodec.java

Modified: lucene/dev/branches/branch_3x/solr/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_3x/solr/CHANGES.txt?rev=1204946&r1=1204945&r2=1204946&view=diff
==============================================================================
--- lucene/dev/branches/branch_3x/solr/CHANGES.txt (original)
+++ lucene/dev/branches/branch_3x/solr/CHANGES.txt Tue Nov 22 12:14:01 2011
@@ -23,6 +23,11 @@ on how to get started.
 
 ==================  3.6.0  ==================
 
+New Features
+----------------------
+* SOLR-2904: BinaryUpdateRequestHandler should be able to accept multiple update requests from
+  a stream (shalin)
+
 ==================  3.5.0  ==================
 Versions of Major Components
 ---------------------

Modified: lucene/dev/branches/branch_3x/solr/core/src/java/org/apache/solr/handler/BinaryUpdateRequestHandler.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_3x/solr/core/src/java/org/apache/solr/handler/BinaryUpdateRequestHandler.java?rev=1204946&r1=1204945&r2=1204946&view=diff
==============================================================================
--- lucene/dev/branches/branch_3x/solr/core/src/java/org/apache/solr/handler/BinaryUpdateRequestHandler.java (original)
+++ lucene/dev/branches/branch_3x/solr/core/src/java/org/apache/solr/handler/BinaryUpdateRequestHandler.java Tue Nov 22 12:14:01 2011
@@ -24,12 +24,14 @@ import org.apache.solr.common.SolrInputD
 import org.apache.solr.common.params.SolrParams;
 import org.apache.solr.common.params.UpdateParams;
 import org.apache.solr.common.util.ContentStream;
+import org.apache.solr.common.util.FastInputStream;
 import org.apache.solr.request.SolrQueryRequest;
 import org.apache.solr.response.SolrQueryResponse;
 import org.apache.solr.update.AddUpdateCommand;
 import org.apache.solr.update.DeleteUpdateCommand;
 import org.apache.solr.update.processor.UpdateRequestProcessor;
 
+import java.io.EOFException;
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.List;
@@ -66,30 +68,49 @@ public class BinaryUpdateRequestHandler 
   private void parseAndLoadDocs(SolrQueryRequest req, SolrQueryResponse rsp, InputStream stream,
                                 final UpdateRequestProcessor processor) throws IOException {
     UpdateRequest update = null;
-    update = new JavaBinUpdateRequestCodec().unmarshal(stream,
-            new JavaBinUpdateRequestCodec.StreamingDocumentHandler() {
-              private AddUpdateCommand addCmd = null;
-
-              public void document(SolrInputDocument document, UpdateRequest updateRequest) {
-                if (addCmd == null) {
-                  addCmd = getAddCommand(updateRequest.getParams());
-                }
-                addCmd.solrDoc = document;
-                try {
-                  processor.processAdd(addCmd);
-                  addCmd.clear();
-                } catch (IOException e) {
-                  throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "ERROR adding document " + document);
-                }
-              }
-            });
-    if (update.getDeleteById() != null) {
-      delete(update.getDeleteById(), processor, true);
-    }
-    if (update.getDeleteQuery() != null) {
-      delete(update.getDeleteQuery(), processor, false);
-    }
+    JavaBinUpdateRequestCodec.StreamingUpdateHandler handler = new JavaBinUpdateRequestCodec.StreamingUpdateHandler() {
+      private AddUpdateCommand addCmd = null;
 
+      public void update(SolrInputDocument document, UpdateRequest updateRequest) {
+        if (document == null) {
+          // Perhaps commit from the parameters
+          try {
+            RequestHandlerUtils.handleCommit(processor, updateRequest.getParams(), false);
+            RequestHandlerUtils.handleRollback(processor, updateRequest.getParams(), false);
+          } catch (IOException e) {
+            throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "ERROR handling commit/rollback");
+          }
+          return;
+        }
+        if (addCmd == null) {
+          addCmd = getAddCommand(updateRequest.getParams());
+        }
+        addCmd.solrDoc = document;
+        try {
+          processor.processAdd(addCmd);
+          addCmd.clear();
+        } catch (IOException e) {
+          throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "ERROR adding document " + document);
+        }
+      }
+    };
+    FastInputStream in = FastInputStream.wrap(stream);
+    for (; ; ) {
+      try {
+        update = new JavaBinUpdateRequestCodec().unmarshal(in, handler);
+      } catch (EOFException e) {
+        break;
+      } catch (Exception e) {
+        log.error("Exception while processing update request", e);
+        break;
+      }
+      if (update.getDeleteById() != null) {
+        delete(update.getDeleteById(), processor, true);
+      }
+      if (update.getDeleteQuery() != null) {
+        delete(update.getDeleteQuery(), processor, false);
+      }
+    }
   }
 
   private AddUpdateCommand getAddCommand(SolrParams params) {

Modified: lucene/dev/branches/branch_3x/solr/solrj/src/java/org/apache/solr/client/solrj/request/JavaBinUpdateRequestCodec.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_3x/solr/solrj/src/java/org/apache/solr/client/solrj/request/JavaBinUpdateRequestCodec.java?rev=1204946&r1=1204945&r2=1204946&view=diff
==============================================================================
--- lucene/dev/branches/branch_3x/solr/solrj/src/java/org/apache/solr/client/solrj/request/JavaBinUpdateRequestCodec.java (original)
+++ lucene/dev/branches/branch_3x/solr/solrj/src/java/org/apache/solr/client/solrj/request/JavaBinUpdateRequestCodec.java Tue Nov 22 12:14:01 2011
@@ -89,7 +89,7 @@ public class JavaBinUpdateRequestCodec {
    *
    * @throws IOException in case of an exception while reading from the input stream or unmarshalling
    */
-  public UpdateRequest unmarshal(InputStream is, final StreamingDocumentHandler handler) throws IOException {
+  public UpdateRequest unmarshal(InputStream is, final StreamingUpdateHandler handler) throws IOException {
     final UpdateRequest updateRequest = new UpdateRequest();
     List<List<NamedList>> doclist;
     List<String> delById;
@@ -136,7 +136,13 @@ public class JavaBinUpdateRequestCodec {
         while (true) {
           Object o = readVal(fis);
           if (o == END_OBJ) break;
-          handler.document(listToSolrInputDocument((List<NamedList>) o), updateRequest);
+          if (o instanceof List) {
+            handler.update(listToSolrInputDocument((List<NamedList>) o), updateRequest);
+          } else if (o instanceof NamedList)  {
+            UpdateRequest req = new UpdateRequest();
+            req.setParams(new ModifiableSolrParams(SolrParams.toSolrParams((NamedList) o)));
+            handler.update(null, req);
+          }
         }
         return Collections.EMPTY_LIST;
       }
@@ -210,7 +216,7 @@ public class JavaBinUpdateRequestCodec {
     return nl;
   }
 
-  public static interface StreamingDocumentHandler {
-    public void document(SolrInputDocument document, UpdateRequest req);
+  public static interface StreamingUpdateHandler {
+    public void update(SolrInputDocument document, UpdateRequest req);
   }
 }

Modified: lucene/dev/branches/branch_3x/solr/solrj/src/test/org/apache/solr/client/solrj/request/TestUpdateRequestCodec.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_3x/solr/solrj/src/test/org/apache/solr/client/solrj/request/TestUpdateRequestCodec.java?rev=1204946&r1=1204945&r2=1204946&view=diff
==============================================================================
--- lucene/dev/branches/branch_3x/solr/solrj/src/test/org/apache/solr/client/solrj/request/TestUpdateRequestCodec.java (original)
+++ lucene/dev/branches/branch_3x/solr/solrj/src/test/org/apache/solr/client/solrj/request/TestUpdateRequestCodec.java Tue Nov 22 12:14:01 2011
@@ -21,6 +21,7 @@ import junit.framework.Assert;
 import org.apache.lucene.util.LuceneTestCase;
 import org.apache.solr.common.SolrInputDocument;
 import org.apache.solr.common.SolrInputField;
+import org.apache.solr.common.util.FastInputStream;
 import org.junit.Test;
 
 import java.io.ByteArrayInputStream;
@@ -83,8 +84,8 @@ public class TestUpdateRequestCodec exte
     ByteArrayOutputStream baos = new ByteArrayOutputStream();
     codec.marshal(updateRequest, baos);
     final List<SolrInputDocument> docs = new ArrayList<SolrInputDocument>();
-    JavaBinUpdateRequestCodec.StreamingDocumentHandler handler = new JavaBinUpdateRequestCodec.StreamingDocumentHandler() {
-      public void document(SolrInputDocument document, UpdateRequest req) {
+    JavaBinUpdateRequestCodec.StreamingUpdateHandler handler = new JavaBinUpdateRequestCodec.StreamingUpdateHandler() {
+      public void update(SolrInputDocument document, UpdateRequest req) {
         Assert.assertNotNull(req.getParams());
         docs.add(document);
       }
@@ -131,8 +132,8 @@ public class TestUpdateRequestCodec exte
     ByteArrayOutputStream baos = new ByteArrayOutputStream();
     codec.marshal(updateRequest, baos);
     final List<SolrInputDocument> docs = new ArrayList<SolrInputDocument>();
-    JavaBinUpdateRequestCodec.StreamingDocumentHandler handler = new JavaBinUpdateRequestCodec.StreamingDocumentHandler() {
-      public void document(SolrInputDocument document, UpdateRequest req) {
+    JavaBinUpdateRequestCodec.StreamingUpdateHandler handler = new JavaBinUpdateRequestCodec.StreamingUpdateHandler() {
+      public void update(SolrInputDocument document, UpdateRequest req) {
         Assert.assertNotNull(req.getParams());
         docs.add(document);
       }
@@ -154,7 +155,7 @@ public class TestUpdateRequestCodec exte
 
   }
 
-      
+
 
   private void compareDocs(String m, 
                            SolrInputDocument expectedDoc,