You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by no...@apache.org on 2018/10/09 06:46:29 UTC
lucene-solr:branch_7x: SOLR-12843: Implement a MultiContentWriter in
SolrJ to post multiple files/payload at once
Repository: lucene-solr
Updated Branches:
refs/heads/branch_7x b3ab84e44 -> a1f03ba43
SOLR-12843: Implement a MultiContentWriter in SolrJ to post multiple files/payload at once
Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/a1f03ba4
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/a1f03ba4
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/a1f03ba4
Branch: refs/heads/branch_7x
Commit: a1f03ba43b02c2bbaebfc443b6ab4e1f68088062
Parents: b3ab84e
Author: Noble Paul <no...@apache.org>
Authored: Tue Oct 9 17:44:40 2018 +1100
Committer: Noble Paul <no...@apache.org>
Committed: Tue Oct 9 17:46:11 2018 +1100
----------------------------------------------------------------------
solr/CHANGES.txt | 2 +
.../solr/handler/UpdateRequestHandler.java | 2 +-
.../solr/handler/loader/JavabinLoader.java | 58 +++++++++
.../request/MultiContentWriterRequest.java | 123 +++++++++++++++++++
.../solr/client/solrj/SolrExampleTests.java | 41 ++++++-
5 files changed, 224 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/a1f03ba4/solr/CHANGES.txt
----------------------------------------------------------------------
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index ad77958..a755161 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -56,6 +56,8 @@ New Features
* SOLR-12815: Implement maxOps limit for IndexSizeTrigger. (ab)
+* SOLR-12843: Implement a MultiContentWriter in SolrJ to post multiple files/payload at once (noble)
+
Other Changes
----------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/a1f03ba4/solr/core/src/java/org/apache/solr/handler/UpdateRequestHandler.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/UpdateRequestHandler.java b/solr/core/src/java/org/apache/solr/handler/UpdateRequestHandler.java
index 3c7ffda..cbe2cba 100644
--- a/solr/core/src/java/org/apache/solr/handler/UpdateRequestHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/UpdateRequestHandler.java
@@ -142,7 +142,7 @@ public class UpdateRequestHandler extends ContentStreamHandlerBase implements Pe
registry.put("application/xml", new XMLLoader().init(p) );
registry.put("application/json", new JsonLoader().init(p) );
registry.put("application/csv", new CSVLoader().init(p) );
- registry.put("application/javabin", new JavabinLoader().init(p) );
+ registry.put("application/javabin", new JavabinLoader(instance).init(p) );
registry.put("text/csv", registry.get("application/csv") );
registry.put("text/xml", registry.get("application/xml") );
registry.put("text/json", registry.get("application/json"));
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/a1f03ba4/solr/core/src/java/org/apache/solr/handler/loader/JavabinLoader.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/loader/JavabinLoader.java b/solr/core/src/java/org/apache/solr/handler/loader/JavabinLoader.java
index f502a8e..01f5f60 100644
--- a/solr/core/src/java/org/apache/solr/handler/loader/JavabinLoader.java
+++ b/solr/core/src/java/org/apache/solr/handler/loader/JavabinLoader.java
@@ -19,6 +19,8 @@ package org.apache.solr.handler.loader;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
+import java.util.Collections;
+import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
@@ -31,7 +33,11 @@ import org.apache.solr.common.params.ShardParams;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.params.UpdateParams;
import org.apache.solr.common.util.ContentStream;
+import org.apache.solr.common.util.ContentStreamBase;
+import org.apache.solr.common.util.DataInputInputStream;
import org.apache.solr.common.util.FastInputStream;
+import org.apache.solr.common.util.JavaBinCodec;
+import org.apache.solr.common.util.NamedList;
import org.apache.solr.handler.RequestHandlerUtils;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.response.SolrQueryResponse;
@@ -46,6 +52,16 @@ import org.apache.solr.update.processor.UpdateRequestProcessor;
* @see org.apache.solr.common.util.JavaBinCodec
*/
public class JavabinLoader extends ContentStreamLoader {
+ final ContentStreamLoader contentStreamLoader;
+
+ public JavabinLoader() {
+ this.contentStreamLoader = this;
+ }
+
+ public JavabinLoader(ContentStreamLoader contentStreamLoader) {
+ super();
+ this.contentStreamLoader = contentStreamLoader;
+ }
@Override
public void load(SolrQueryRequest req, SolrQueryResponse rsp, ContentStream stream, UpdateRequestProcessor processor) throws Exception {
@@ -62,6 +78,10 @@ public class JavabinLoader extends ContentStreamLoader {
private void parseAndLoadDocs(final SolrQueryRequest req, SolrQueryResponse rsp, InputStream stream,
final UpdateRequestProcessor processor) throws IOException {
+ if (req.getParams().getBool("multistream", false)) {
+ handleMultiStream(req, rsp, stream, processor);
+ return;
+ }
UpdateRequest update = null;
JavaBinUpdateRequestCodec.StreamingUpdateHandler handler = new JavaBinUpdateRequestCodec.StreamingUpdateHandler() {
private AddUpdateCommand addCmd = null;
@@ -116,6 +136,44 @@ public class JavabinLoader extends ContentStreamLoader {
}
}
+ private void handleMultiStream(SolrQueryRequest req, SolrQueryResponse rsp, InputStream stream, UpdateRequestProcessor processor)
+ throws IOException {
+ FastInputStream in = FastInputStream.wrap(stream);
+ SolrParams old = req.getParams();
+ new JavaBinCodec() {
+ SolrParams params;
+ AddUpdateCommand addCmd = null;
+
+ @Override
+ public List<Object> readIterator(DataInputInputStream fis) throws IOException {
+ while (true) {
+ Object o = readVal(fis);
+ if (o == END_OBJ) break;
+ if (o instanceof NamedList) {
+ params = ((NamedList) o).toSolrParams();
+ } else {
+ try {
+ if (o instanceof byte[]) {
+ if (params != null) req.setParams(params);
+ byte[] buf = (byte[]) o;
+ contentStreamLoader.load(req, rsp, new ContentStreamBase.ByteArrayStream(buf, null), processor);
+ } else {
+ throw new RuntimeException("unsupported type ");
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ } finally {
+ params = null;
+ req.setParams(old);
+ }
+ }
+ }
+ return Collections.emptyList();
+ }
+
+ }.unmarshal(in);
+ }
+
private AddUpdateCommand getAddCommand(SolrQueryRequest req, SolrParams params) {
AddUpdateCommand addCmd = new AddUpdateCommand(req);
addCmd.overwrite = params.getBool(UpdateParams.OVERWRITE, true);
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/a1f03ba4/solr/solrj/src/java/org/apache/solr/client/solrj/request/MultiContentWriterRequest.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/request/MultiContentWriterRequest.java b/solr/solrj/src/java/org/apache/solr/client/solrj/request/MultiContentWriterRequest.java
new file mode 100644
index 0000000..1a206b8
--- /dev/null
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/request/MultiContentWriterRequest.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.client.solrj.request;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.io.Reader;
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+
+import org.apache.solr.client.solrj.impl.BinaryRequestWriter;
+import org.apache.solr.common.IteratorWriter;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.util.JavaBinCodec;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.util.Pair;
+
+import static org.apache.solr.common.params.UpdateParams.ASSUME_CONTENT_TYPE;
+
+public class MultiContentWriterRequest extends AbstractUpdateRequest {
+
+ private final Iterator<Pair<NamedList, Object>> payload;
+
+ /**
+ *
+ * @param m HTTP method
+ * @param path path to which to post to
+ * @param payload add the per doc params, The Object could be a ByteBuffer or byte[]
+ */
+
+ public MultiContentWriterRequest(METHOD m, String path, Iterator<Pair<NamedList, Object>> payload) {
+ super(m, path);
+ params = new ModifiableSolrParams();
+ params.add("multistream", "true");
+ this.payload = payload;
+ }
+
+
+ @Override
+ public RequestWriter.ContentWriter getContentWriter(String expectedType) {
+ return new RequestWriter.ContentWriter() {
+ @Override
+ public void write(OutputStream os) throws IOException {
+ new JavaBinCodec().marshal((IteratorWriter) iw -> {
+ while (payload.hasNext()) {
+ Pair<NamedList, Object> next = payload.next();
+
+ if (next.second() instanceof ByteBuffer || next.second() instanceof byte[]) {
+ NamedList params = next.first();
+ if(params.get(ASSUME_CONTENT_TYPE) == null){
+ String detectedType = detect(next.second());
+ if(detectedType==null){
+ throw new RuntimeException("Unknown content type");
+ }
+ params.add(ASSUME_CONTENT_TYPE, detectedType);
+ }
+ iw.add(params);
+ iw.add(next.second());
+ } else {
+ throw new RuntimeException("payload value must be byte[] or ByteBuffer");
+ }
+ }
+ }, os);
+ }
+
+ @Override
+ public String getContentType() {
+ return "application/javabin";
+ }
+ };
+ }
+ public static String detect(Object o) throws IOException {
+ Reader rdr = null;
+ byte[] bytes = null;
+ if (o instanceof byte[]) bytes = (byte[]) o;
+ else if (o instanceof ByteBuffer) bytes = ((ByteBuffer) o).array();
+ rdr = new InputStreamReader(new ByteArrayInputStream(bytes));
+ String detectedContentType = null;
+ for (;;) {
+ int ch = rdr.read();
+ if (Character.isWhitespace(ch)) {
+ continue;
+ }
+ int nextChar = -1;
+ // first non-whitespace chars
+ if (ch == '#' // single line comment
+ || (ch == '/' && ((nextChar = rdr.read()) == '/' || nextChar == '*')) // single line or multi-line comment
+ || (ch == '{' || ch == '[') // start of JSON object
+ )
+ {
+ detectedContentType = "application/json";
+ } else if (ch == '<') {
+ detectedContentType = "text/xml";
+ }
+ break;
+ }
+ return detectedContentType;
+ }
+
+ public static ByteBuffer readByteBuffer(InputStream is) throws IOException {
+ BinaryRequestWriter.BAOS baos = new BinaryRequestWriter.BAOS();
+ org.apache.commons.io.IOUtils.copy(is, baos);
+ return ByteBuffer.wrap(baos.getbuf(), 0, baos.size());
+ }
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/a1f03ba4/solr/solrj/src/test/org/apache/solr/client/solrj/SolrExampleTests.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/SolrExampleTests.java b/solr/solrj/src/test/org/apache/solr/client/solrj/SolrExampleTests.java
index 1dabe5d..cb375d6 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/SolrExampleTests.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/SolrExampleTests.java
@@ -17,8 +17,11 @@
package org.apache.solr.client.solrj;
+import java.io.FileInputStream;
import java.io.IOException;
+import java.io.InputStream;
import java.lang.invoke.MethodHandles;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -43,6 +46,7 @@ import org.apache.solr.client.solrj.request.AbstractUpdateRequest;
import org.apache.solr.client.solrj.request.AbstractUpdateRequest.ACTION;
import org.apache.solr.client.solrj.request.ContentStreamUpdateRequest;
import org.apache.solr.client.solrj.request.LukeRequest;
+import org.apache.solr.client.solrj.request.MultiContentWriterRequest;
import org.apache.solr.client.solrj.request.QueryRequest;
import org.apache.solr.client.solrj.request.StreamingUpdateRequest;
import org.apache.solr.client.solrj.request.UpdateRequest;
@@ -64,11 +68,14 @@ import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.FacetParams;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.util.Pair;
+import org.apache.solr.common.util.Utils;
import org.junit.Test;
import org.noggit.JSONParser;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static org.apache.solr.common.params.UpdateParams.ASSUME_CONTENT_TYPE;
import static org.junit.internal.matchers.StringContains.containsString;
/**
@@ -671,7 +678,39 @@ abstract public class SolrExampleTests extends SolrExampleTestsBase
Assert.assertEquals( 10, rsp.getResults().getNumFound() );
}
- @Test
+ @Test
+ public void testMultiContentWriterRequest() throws Exception {
+ SolrClient client = getSolrClient();
+ client.deleteByQuery("*:*");// delete everything!
+ client.commit();
+ QueryResponse rsp = client.query(new SolrQuery("*:*"));
+ Assert.assertEquals(0, rsp.getResults().getNumFound());
+
+ List<Pair<NamedList, Object>> docs = new ArrayList<>();
+ NamedList params = new NamedList();
+ docs.add(new Pair(params, getFileContent(params, "solrj/docs1.xml")));
+
+ params = new NamedList();
+ params.add(ASSUME_CONTENT_TYPE, "application/csv");
+ docs.add(new Pair(params, getFileContent(params, "solrj/books.csv")));
+
+ MultiContentWriterRequest up = new MultiContentWriterRequest(SolrRequest.METHOD.POST, "/update", docs.iterator());
+ up.setAction(AbstractUpdateRequest.ACTION.COMMIT, true, true);
+ NamedList<Object> result = client.request(up);
+ System.out.println(result.jsonStr());
+ rsp = client.query(new SolrQuery("*:*"));
+ Assert.assertEquals(12, rsp.getResults().getNumFound());
+
+ }
+
+ private ByteBuffer getFileContent(NamedList nl, String name) throws IOException {
+ try (InputStream is = new FileInputStream(getFile(name))) {
+ return MultiContentWriterRequest.readByteBuffer(is);
+ }
+ }
+
+
+ @Test
public void testMultiContentStreamRequest() throws Exception {
SolrClient client = getSolrClient();
client.deleteByQuery("*:*");// delete everything!