You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by no...@apache.org on 2018/10/09 06:44:57 UTC

lucene-solr:master: SOLR-12843: Implement a MultiContentWriter in SolrJ to post multiple files/payload at once

Repository: lucene-solr
Updated Branches:
  refs/heads/master dbed8bafe -> b4d9b25f4


SOLR-12843: Implement a MultiContentWriter in SolrJ to post multiple files/payload at once


Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/b4d9b25f
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/b4d9b25f
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/b4d9b25f

Branch: refs/heads/master
Commit: b4d9b25f4430d1c6491986d6f1805210bf1cfd39
Parents: dbed8ba
Author: Noble Paul <no...@apache.org>
Authored: Tue Oct 9 17:44:40 2018 +1100
Committer: Noble Paul <no...@apache.org>
Committed: Tue Oct 9 17:44:40 2018 +1100

----------------------------------------------------------------------
 solr/CHANGES.txt                                |   2 +
 .../solr/handler/UpdateRequestHandler.java      |   2 +-
 .../solr/handler/loader/JavabinLoader.java      |  58 +++++++++
 .../request/MultiContentWriterRequest.java      | 123 +++++++++++++++++++
 .../solr/client/solrj/SolrExampleTests.java     |  41 ++++++-
 5 files changed, 224 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b4d9b25f/solr/CHANGES.txt
----------------------------------------------------------------------
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index 9bf6080..49e425d 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -138,6 +138,8 @@ New Features
 
 * SOLR-12815: Implement maxOps limit for IndexSizeTrigger. (ab)
 
+* SOLR-12843: Implement a MultiContentWriter in SolrJ to post multiple files/payload at once (noble)
+
 Other Changes
 ----------------------
 

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b4d9b25f/solr/core/src/java/org/apache/solr/handler/UpdateRequestHandler.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/UpdateRequestHandler.java b/solr/core/src/java/org/apache/solr/handler/UpdateRequestHandler.java
index 3c7ffda..cbe2cba 100644
--- a/solr/core/src/java/org/apache/solr/handler/UpdateRequestHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/UpdateRequestHandler.java
@@ -142,7 +142,7 @@ public class UpdateRequestHandler extends ContentStreamHandlerBase implements Pe
     registry.put("application/xml", new XMLLoader().init(p) );
     registry.put("application/json", new JsonLoader().init(p) );
     registry.put("application/csv", new CSVLoader().init(p) );
-    registry.put("application/javabin", new JavabinLoader().init(p) );
+    registry.put("application/javabin", new JavabinLoader(instance).init(p) );
     registry.put("text/csv", registry.get("application/csv") );
     registry.put("text/xml", registry.get("application/xml") );
     registry.put("text/json", registry.get("application/json"));

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b4d9b25f/solr/core/src/java/org/apache/solr/handler/loader/JavabinLoader.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/loader/JavabinLoader.java b/solr/core/src/java/org/apache/solr/handler/loader/JavabinLoader.java
index f502a8e..01f5f60 100644
--- a/solr/core/src/java/org/apache/solr/handler/loader/JavabinLoader.java
+++ b/solr/core/src/java/org/apache/solr/handler/loader/JavabinLoader.java
@@ -19,6 +19,8 @@ package org.apache.solr.handler.loader;
 import java.io.EOFException;
 import java.io.IOException;
 import java.io.InputStream;
+import java.util.Collections;
+import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
@@ -31,7 +33,11 @@ import org.apache.solr.common.params.ShardParams;
 import org.apache.solr.common.params.SolrParams;
 import org.apache.solr.common.params.UpdateParams;
 import org.apache.solr.common.util.ContentStream;
+import org.apache.solr.common.util.ContentStreamBase;
+import org.apache.solr.common.util.DataInputInputStream;
 import org.apache.solr.common.util.FastInputStream;
+import org.apache.solr.common.util.JavaBinCodec;
+import org.apache.solr.common.util.NamedList;
 import org.apache.solr.handler.RequestHandlerUtils;
 import org.apache.solr.request.SolrQueryRequest;
 import org.apache.solr.response.SolrQueryResponse;
@@ -46,6 +52,16 @@ import org.apache.solr.update.processor.UpdateRequestProcessor;
  * @see org.apache.solr.common.util.JavaBinCodec
  */
 public class JavabinLoader extends ContentStreamLoader {
+  final ContentStreamLoader contentStreamLoader;
+
+  public JavabinLoader() {
+    this.contentStreamLoader = this;
+  }
+
+  public JavabinLoader(ContentStreamLoader contentStreamLoader) {
+    super();
+    this.contentStreamLoader = contentStreamLoader;
+  }
 
   @Override
   public void load(SolrQueryRequest req, SolrQueryResponse rsp, ContentStream stream, UpdateRequestProcessor processor) throws Exception {
@@ -62,6 +78,10 @@ public class JavabinLoader extends ContentStreamLoader {
   
   private void parseAndLoadDocs(final SolrQueryRequest req, SolrQueryResponse rsp, InputStream stream,
                                 final UpdateRequestProcessor processor) throws IOException {
+    if (req.getParams().getBool("multistream", false)) {
+      handleMultiStream(req, rsp, stream, processor);
+      return;
+    }
     UpdateRequest update = null;
     JavaBinUpdateRequestCodec.StreamingUpdateHandler handler = new JavaBinUpdateRequestCodec.StreamingUpdateHandler() {
       private AddUpdateCommand addCmd = null;
@@ -116,6 +136,44 @@ public class JavabinLoader extends ContentStreamLoader {
     }
   }
 
+  private void handleMultiStream(SolrQueryRequest req, SolrQueryResponse rsp, InputStream stream, UpdateRequestProcessor processor)
+      throws IOException {
+    FastInputStream in = FastInputStream.wrap(stream);
+    SolrParams old = req.getParams();
+    new JavaBinCodec() {
+      SolrParams params;
+      AddUpdateCommand addCmd = null;
+
+      @Override
+      public List<Object> readIterator(DataInputInputStream fis) throws IOException {
+        while (true) {
+          Object o = readVal(fis);
+          if (o == END_OBJ) break;
+          if (o instanceof NamedList) {
+            params = ((NamedList) o).toSolrParams();
+          } else {
+            try {
+              if (o instanceof byte[]) {
+                if (params != null) req.setParams(params);
+                byte[] buf = (byte[]) o;
+                contentStreamLoader.load(req, rsp, new ContentStreamBase.ByteArrayStream(buf, null), processor);
+              } else {
+                throw new RuntimeException("unsupported type ");
+              }
+            } catch (Exception e) {
+              throw new RuntimeException(e);
+            } finally {
+              params = null;
+              req.setParams(old);
+            }
+          }
+        }
+        return Collections.emptyList();
+      }
+
+    }.unmarshal(in);
+  }
+
   private AddUpdateCommand getAddCommand(SolrQueryRequest req, SolrParams params) {
     AddUpdateCommand addCmd = new AddUpdateCommand(req);
     addCmd.overwrite = params.getBool(UpdateParams.OVERWRITE, true);

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b4d9b25f/solr/solrj/src/java/org/apache/solr/client/solrj/request/MultiContentWriterRequest.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/request/MultiContentWriterRequest.java b/solr/solrj/src/java/org/apache/solr/client/solrj/request/MultiContentWriterRequest.java
new file mode 100644
index 0000000..1a206b8
--- /dev/null
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/request/MultiContentWriterRequest.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.client.solrj.request;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.io.Reader;
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+
+import org.apache.solr.client.solrj.impl.BinaryRequestWriter;
+import org.apache.solr.common.IteratorWriter;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.util.JavaBinCodec;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.util.Pair;
+
+import static org.apache.solr.common.params.UpdateParams.ASSUME_CONTENT_TYPE;
+
+public class MultiContentWriterRequest extends AbstractUpdateRequest {
+
+  private final Iterator<Pair<NamedList, Object>> payload;
+
+  /**
+   *
+   * @param m HTTP method
+   * @param path path to which to post to
+   * @param payload add the per doc params, The Object could be a ByteBuffer or byte[]
+   */
+
+  public MultiContentWriterRequest(METHOD m, String path, Iterator<Pair<NamedList, Object>> payload) {
+    super(m, path);
+    params = new ModifiableSolrParams();
+    params.add("multistream", "true");
+    this.payload = payload;
+  }
+
+
+  @Override
+  public RequestWriter.ContentWriter getContentWriter(String expectedType) {
+    return new RequestWriter.ContentWriter() {
+      @Override
+      public void write(OutputStream os) throws IOException {
+        new JavaBinCodec().marshal((IteratorWriter) iw -> {
+          while (payload.hasNext()) {
+            Pair<NamedList, Object> next = payload.next();
+
+            if (next.second() instanceof ByteBuffer || next.second() instanceof byte[]) {
+              NamedList params = next.first();
+              if(params.get(ASSUME_CONTENT_TYPE) == null){
+                String detectedType = detect(next.second());
+                if(detectedType==null){
+                  throw new RuntimeException("Unknown content type");
+                }
+                params.add(ASSUME_CONTENT_TYPE, detectedType);
+              }
+              iw.add(params);
+              iw.add(next.second());
+            }  else {
+              throw new RuntimeException("payload value must be byte[] or ByteBuffer");
+            }
+          }
+        }, os);
+      }
+
+      @Override
+      public String getContentType() {
+        return "application/javabin";
+      }
+    };
+  }
+  public static String detect(Object o) throws IOException {
+    Reader rdr = null;
+    byte[] bytes = null;
+    if (o instanceof byte[]) bytes = (byte[]) o;
+    else if (o instanceof ByteBuffer) bytes = ((ByteBuffer) o).array();
+    rdr = new InputStreamReader(new ByteArrayInputStream(bytes));
+    String detectedContentType = null;
+    for (;;) {
+      int ch = rdr.read();
+      if (Character.isWhitespace(ch)) {
+        continue;
+      }
+      int nextChar = -1;
+      // first non-whitespace chars
+      if (ch == '#'                         // single line comment
+          || (ch == '/' && ((nextChar = rdr.read()) == '/' || nextChar == '*'))  // single line or multi-line comment
+          || (ch == '{' || ch == '[')       // start of JSON object
+          )
+      {
+        detectedContentType = "application/json";
+      } else if (ch == '<') {
+        detectedContentType = "text/xml";
+      }
+      break;
+    }
+    return detectedContentType;
+  }
+
+  public static ByteBuffer readByteBuffer(InputStream is) throws IOException {
+    BinaryRequestWriter.BAOS baos = new BinaryRequestWriter.BAOS();
+    org.apache.commons.io.IOUtils.copy(is, baos);
+    return ByteBuffer.wrap(baos.getbuf(), 0, baos.size());
+  }
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b4d9b25f/solr/solrj/src/test/org/apache/solr/client/solrj/SolrExampleTests.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/SolrExampleTests.java b/solr/solrj/src/test/org/apache/solr/client/solrj/SolrExampleTests.java
index 1dabe5d..cb375d6 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/SolrExampleTests.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/SolrExampleTests.java
@@ -17,8 +17,11 @@
 package org.apache.solr.client.solrj;
 
 
+import java.io.FileInputStream;
 import java.io.IOException;
+import java.io.InputStream;
 import java.lang.invoke.MethodHandles;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -43,6 +46,7 @@ import org.apache.solr.client.solrj.request.AbstractUpdateRequest;
 import org.apache.solr.client.solrj.request.AbstractUpdateRequest.ACTION;
 import org.apache.solr.client.solrj.request.ContentStreamUpdateRequest;
 import org.apache.solr.client.solrj.request.LukeRequest;
+import org.apache.solr.client.solrj.request.MultiContentWriterRequest;
 import org.apache.solr.client.solrj.request.QueryRequest;
 import org.apache.solr.client.solrj.request.StreamingUpdateRequest;
 import org.apache.solr.client.solrj.request.UpdateRequest;
@@ -64,11 +68,14 @@ import org.apache.solr.common.params.CommonParams;
 import org.apache.solr.common.params.FacetParams;
 import org.apache.solr.common.params.ModifiableSolrParams;
 import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.util.Pair;
+import org.apache.solr.common.util.Utils;
 import org.junit.Test;
 import org.noggit.JSONParser;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.solr.common.params.UpdateParams.ASSUME_CONTENT_TYPE;
 import static org.junit.internal.matchers.StringContains.containsString;
 
 /**
@@ -671,7 +678,39 @@ abstract public class SolrExampleTests extends SolrExampleTestsBase
     Assert.assertEquals( 10, rsp.getResults().getNumFound() );
   }
 
- @Test
+  @Test
+  public void testMultiContentWriterRequest() throws Exception {
+    SolrClient client = getSolrClient();
+    client.deleteByQuery("*:*");// delete everything!
+    client.commit();
+    QueryResponse rsp = client.query(new SolrQuery("*:*"));
+    Assert.assertEquals(0, rsp.getResults().getNumFound());
+
+    List<Pair<NamedList, Object>> docs = new ArrayList<>();
+    NamedList params = new NamedList();
+    docs.add(new Pair(params, getFileContent(params, "solrj/docs1.xml")));
+
+    params = new NamedList();
+    params.add(ASSUME_CONTENT_TYPE, "application/csv");
+    docs.add(new Pair(params, getFileContent(params, "solrj/books.csv")));
+
+    MultiContentWriterRequest up = new MultiContentWriterRequest(SolrRequest.METHOD.POST, "/update", docs.iterator());
+    up.setAction(AbstractUpdateRequest.ACTION.COMMIT, true, true);
+    NamedList<Object> result = client.request(up);
+    System.out.println(result.jsonStr());
+    rsp = client.query(new SolrQuery("*:*"));
+    Assert.assertEquals(12, rsp.getResults().getNumFound());
+
+  }
+
+  private ByteBuffer getFileContent(NamedList nl, String name) throws IOException {
+    try (InputStream is = new FileInputStream(getFile(name))) {
+      return MultiContentWriterRequest.readByteBuffer(is);
+    }
+  }
+
+
+  @Test
  public void testMultiContentStreamRequest() throws Exception {
     SolrClient client = getSolrClient();
     client.deleteByQuery("*:*");// delete everything!