You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by no...@apache.org on 2019/08/13 08:54:28 UTC
[lucene-solr] branch master updated: SOLR-13688: Run the bin/solr
export command multithreaded
This is an automated email from the ASF dual-hosted git repository.
noble pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git
The following commit(s) were added to refs/heads/master by this push:
new 07ca02b SOLR-13688: Run the bin/solr export command multithreaded
07ca02b is described below
commit 07ca02b7375a9c2564aba4c905e880a32d16e1df
Author: noble <no...@apache.org>
AuthorDate: Tue Aug 13 18:54:05 2019 +1000
SOLR-13688: Run the bin/solr export command multithreaded
---
.../src/java/org/apache/solr/util/ExportTool.java | 344 ++++++++++++++++-----
.../test/org/apache/solr/util/TestExportTool.java | 188 +++++++----
2 files changed, 398 insertions(+), 134 deletions(-)
diff --git a/solr/core/src/java/org/apache/solr/util/ExportTool.java b/solr/core/src/java/org/apache/solr/util/ExportTool.java
index 7b5525e..ab29800 100644
--- a/solr/core/src/java/org/apache/solr/util/ExportTool.java
+++ b/solr/core/src/java/org/apache/solr/util/ExportTool.java
@@ -17,42 +17,65 @@
package org.apache.solr.util;
+import java.io.BufferedOutputStream;
+import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
+import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.io.PrintStream;
+import java.io.Writer;
import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
import java.util.Collections;
+import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
import com.google.common.collect.ImmutableSet;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionBuilder;
-import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.SolrQuery;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.StreamingResponseCallback;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.impl.ClusterStateProvider;
+import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.client.solrj.impl.StreamingBinaryResponseParser;
import org.apache.solr.client.solrj.request.GenericSolrRequest;
-import org.apache.solr.client.solrj.request.QueryRequest;
import org.apache.solr.common.SolrDocument;
-import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.SolrDocumentList;
+import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.CursorMarkParams;
import org.apache.solr.common.params.MapSolrParams;
-import org.apache.solr.common.util.FastWriter;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.common.util.JavaBinCodec;
import org.apache.solr.common.util.NamedList;
-import org.apache.solr.common.util.SolrJSONWriter;
+import org.apache.solr.common.util.StrUtils;
+import org.noggit.CharArr;
+import org.noggit.JSONWriter;
import static org.apache.solr.common.params.CommonParams.FL;
import static org.apache.solr.common.params.CommonParams.JAVABIN;
+import static org.apache.solr.common.params.CommonParams.Q;
+import static org.apache.solr.common.params.CommonParams.SORT;
+import static org.apache.solr.common.util.JavaBinCodec.SOLRINPUTDOC;
public class ExportTool extends SolrCLI.ToolBase {
@Override
@@ -65,7 +88,7 @@ public class ExportTool extends SolrCLI.ToolBase {
return OPTIONS;
}
- public static class Info {
+ public static abstract class Info {
String baseurl;
String format;
String query;
@@ -73,10 +96,12 @@ public class ExportTool extends SolrCLI.ToolBase {
String out;
String fields;
long limit = 100;
- long docsWritten = 0;
+ AtomicLong docsWritten = new AtomicLong(0);
+ int bufferSize = 1024 * 1024;
PrintStream output;
- //for testing purposes only
- public SolrClient solrClient;
+ String uniqueKey;
+ CloudSolrClient solrClient;
+ DocsSink sink;
public Info(String url) {
@@ -117,60 +142,24 @@ public class ExportTool extends SolrCLI.ToolBase {
return JAVABIN.equals(format) ? new JavabinSink(this) : new JsonSink(this);
}
- void exportDocsWithCursorMark() throws SolrServerException, IOException {
- DocsSink sink = getSink();
+ abstract void exportDocs() throws Exception;
+
+ void fetchUniqueKey() throws SolrServerException, IOException {
solrClient = new CloudSolrClient.Builder(Collections.singletonList(baseurl)).build();
- NamedList<Object> rsp1 = solrClient.request(new GenericSolrRequest(SolrRequest.METHOD.GET, "/schema/uniquekey",
+ NamedList<Object> response = solrClient.request(new GenericSolrRequest(SolrRequest.METHOD.GET, "/schema/uniquekey",
new MapSolrParams(Collections.singletonMap("collection", coll))));
- String uniqueKey = (String) rsp1.get("uniqueKey");
-
- sink.start();
- try {
- NamedList<Object> rsp;
- SolrQuery q = (new SolrQuery(query))
- .setParam("collection", coll)
- .setRows(100)
- .setSort(SolrQuery.SortClause.asc(uniqueKey));
- if (fields != null) {
- q.setParam(FL, fields);
- }
-
- String cursorMark = CursorMarkParams.CURSOR_MARK_START;
- boolean done = false;
- StreamingResponseCallback streamer = getStreamer(sink);
-
- if(output!= null) output.println("Exporting data to : "+ out);
- while (!done) {
- if (docsWritten >= limit) break;
- QueryRequest request = new QueryRequest(q);
- request.setResponseParser(new StreamingBinaryResponseParser(streamer));
- q.set(CursorMarkParams.CURSOR_MARK_PARAM, cursorMark);
- rsp = solrClient.request(request);
- String nextCursorMark = (String) rsp.get(CursorMarkParams.CURSOR_MARK_NEXT);
- if (nextCursorMark == null || Objects.equals(cursorMark, nextCursorMark)) {
- break;
- }
- cursorMark = nextCursorMark;
- if(output!= null) output.print(".");
- }
- if(output!= null) output.println("\n DONE!");
- } finally {
- sink.end();
- solrClient.close();
-
- }
+ uniqueKey = (String) response.get("uniqueKey");
}
- private StreamingResponseCallback getStreamer(DocsSink sink) {
+ public static StreamingResponseCallback getStreamer(Consumer<SolrDocument> sink) {
return new StreamingResponseCallback() {
@Override
public void streamSolrDocument(SolrDocument doc) {
try {
sink.accept(doc);
- } catch (IOException e) {
+ } catch (Exception e) {
throw new RuntimeException(e);
}
- docsWritten++;
}
@Override
@@ -186,21 +175,24 @@ public class ExportTool extends SolrCLI.ToolBase {
@Override
protected void runImpl(CommandLine cli) throws Exception {
- Info info = new Info(cli.getOptionValue("url"));
+ String url = cli.getOptionValue("url");
+ Info info = new MultiThreadedRunner(url);
info.query = cli.getOptionValue("query", "*:*");
info.setOutFormat(cli.getOptionValue("out"), cli.getOptionValue("format"));
info.fields = cli.getOptionValue("fields");
info.setLimit(cli.getOptionValue("limit", "100"));
info.output = super.stdout;
- info.exportDocsWithCursorMark();
+ info.exportDocs();
}
interface DocsSink {
- void start() throws IOException;
+ default void start() throws IOException {
+ }
- void accept(SolrDocument document) throws IOException;
+ void accept(SolrDocument document) throws IOException, InterruptedException;
- void end() throws IOException;
+ default void end() throws IOException {
+ }
}
private static final Option[] OPTIONS = {
@@ -236,11 +228,13 @@ public class ExportTool extends SolrCLI.ToolBase {
.create("fields")
};
- private static class JsonSink implements DocsSink {
+ static class JsonSink implements DocsSink {
private final Info info;
- private SolrJSONWriter jsonw;
- private FastWriter writer;
- private FileOutputStream fos;
+ private CharArr charArr = new CharArr(1024 * 2);
+ JSONWriter jsonWriter = new JSONWriter(charArr, -1);
+ private Writer writer;
+ private OutputStream fos;
+ public AtomicLong docs = new AtomicLong();
public JsonSink(Info info) {
this.info = info;
@@ -249,24 +243,27 @@ public class ExportTool extends SolrCLI.ToolBase {
@Override
public void start() throws IOException {
fos = new FileOutputStream(info.out);
- writer = FastWriter.wrap(new OutputStreamWriter(fos, StandardCharsets.UTF_8));
- jsonw = new SolrJSONWriter(writer);
- jsonw.setIndent(false);
+ if (info.bufferSize > 0) {
+ fos = new BufferedOutputStream(fos, info.bufferSize);
+ }
+ writer = new OutputStreamWriter(fos, StandardCharsets.UTF_8);
}
@Override
public void end() throws IOException {
- jsonw.close();
+ writer.flush();
+ fos.flush();
fos.close();
-
}
@Override
- public void accept(SolrDocument doc) throws IOException {
+ public synchronized void accept(SolrDocument doc) throws IOException {
+ docs.incrementAndGet();
+ charArr.reset();
Map m = new LinkedHashMap(doc.size());
doc.forEach((s, field) -> {
- if (s.equals("_version_")) return;
+ if (s.equals("_version_") || s.equals("_roor_")) return;
if (field instanceof List) {
if (((List) field).size() == 1) {
field = ((List) field).get(0);
@@ -274,17 +271,16 @@ public class ExportTool extends SolrCLI.ToolBase {
}
m.put(s, field);
});
- jsonw.writeObj(m);
- writer.flush();
+ jsonWriter.write(m);
+ writer.write(charArr.getArray(), charArr.getStart(), charArr.getEnd());
writer.append('\n');
-
}
}
private static class JavabinSink implements DocsSink {
private final Info info;
JavaBinCodec codec;
- FileOutputStream fos;
+ OutputStream fos;
public JavabinSink(Info info) {
this.info = info;
@@ -293,6 +289,9 @@ public class ExportTool extends SolrCLI.ToolBase {
@Override
public void start() throws IOException {
fos = new FileOutputStream(info.out);
+ if (info.bufferSize > 0) {
+ fos = new BufferedOutputStream(fos, info.bufferSize);
+ }
codec = new JavaBinCodec(fos, null);
codec.writeTag(JavaBinCodec.NAMED_LST, 2);
codec.writeStr("params");
@@ -306,23 +305,208 @@ public class ExportTool extends SolrCLI.ToolBase {
public void end() throws IOException {
codec.writeTag(JavaBinCodec.END);
codec.close();
+ fos.flush();
fos.close();
}
+ private BiConsumer<String, Object> bic= new BiConsumer<>() {
+ @Override
+ public void accept(String s, Object o) {
+ try {
+ if (s.equals("_version_") || s.equals("_root_")) return;
+ codec.writeExternString(s);
+ codec.writeVal(o);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ };
@Override
- public void accept(SolrDocument doc) throws IOException {
- SolrInputDocument document = new SolrInputDocument();
- doc.forEach((s, o) -> {
- if (s.equals("_version_")) return;
- if (o instanceof List) {
- if (((List) o).size() == 1) o = ((List) o).get(0);
+ public synchronized void accept(SolrDocument doc) throws IOException {
+ int sz = doc.size();
+ if(doc.containsKey("_version_")) sz--;
+ if(doc.containsKey("_root_")) sz--;
+ codec.writeTag(SOLRINPUTDOC, sz);
+ codec.writeFloat(1f); // document boost
+ doc.forEach(bic);
+ }
+ }
+
+ static class MultiThreadedRunner extends Info {
+ ExecutorService producerThreadpool, consumerThreadpool;
+ ArrayBlockingQueue<SolrDocument> queue = new ArrayBlockingQueue(1000);
+ SolrDocument EOFDOC = new SolrDocument();
+ volatile boolean failed = false;
+ Map<String, CoreHandler> corehandlers = new HashMap();
+
+ public MultiThreadedRunner(String url) {
+ super(url);
+ }
+
+
+ @Override
+ void exportDocs() throws Exception {
+ sink = getSink();
+ fetchUniqueKey();
+ ClusterStateProvider stateProvider = solrClient.getClusterStateProvider();
+ DocCollection coll = stateProvider.getCollection(this.coll);
+ Map<String, Slice> m = coll.getSlicesMap();
+ producerThreadpool = ExecutorUtil.newMDCAwareFixedThreadPool(m.size(),
+ new DefaultSolrThreadFactory("solrcli-exporter-producers"));
+ consumerThreadpool = ExecutorUtil.newMDCAwareFixedThreadPool(1,
+ new DefaultSolrThreadFactory("solrcli-exporter-consumer"));
+ sink.start();
+ CountDownLatch consumerlatch = new CountDownLatch(1);
+ try {
+ addConsumer(consumerlatch);
+ addProducers(m);
+ if (output != null) {
+ output.println("NO of shards : " + corehandlers.size());
}
- document.addField(s, o);
+ CountDownLatch producerLatch = new CountDownLatch(corehandlers.size());
+ corehandlers.forEach((s, coreHandler) -> producerThreadpool.submit(() -> {
+ try {
+ coreHandler.exportDocsFromCore();
+ } catch (Exception e) {
+ if(output != null) output.println("Error exporting docs from : "+s);
+
+ }
+ producerLatch.countDown();
+ }));
+
+ producerLatch.await();
+ queue.offer(EOFDOC, 10, TimeUnit.SECONDS);
+ consumerlatch.await();
+ } finally {
+ sink.end();
+ solrClient.close();
+ producerThreadpool.shutdownNow();
+ consumerThreadpool.shutdownNow();
+ if (failed) {
+ try {
+ Files.delete(new File(out).toPath());
+ } catch (IOException e) {
+ //ignore
+ }
+ }
+ }
+ }
+
+ private void addProducers(Map<String, Slice> m) {
+ for (Map.Entry<String, Slice> entry : m.entrySet()) {
+ Slice slice = entry.getValue();
+ Replica replica = slice.getLeader();
+ if (replica == null) replica = slice.getReplicas().iterator().next();// get a random replica
+ CoreHandler coreHandler = new CoreHandler(replica);
+ corehandlers.put(replica.getCoreName(), coreHandler);
+ }
+ }
+
+ private void addConsumer(CountDownLatch consumerlatch) {
+ consumerThreadpool.submit(() -> {
+ while (true) {
+ SolrDocument doc = null;
+ try {
+ doc = queue.poll(30, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ if (output != null) output.println("Consumer interrupted");
+ failed = true;
+ break;
+ }
+ if (doc == EOFDOC) break;
+ try {
+ if (docsWritten.get() > limit) continue;
+ sink.accept(doc);
+ docsWritten.incrementAndGet();
+ } catch (Exception e) {
+ if (output != null) output.println("Failed to write to file " + e.getMessage());
+ failed = true;
+ }
+ }
+ consumerlatch.countDown();
});
+ }
+
- codec.writeSolrInputDocument(document);
+ class CoreHandler {
+ final Replica replica;
+ long expectedDocs;
+ AtomicLong receivedDocs = new AtomicLong();
+ CoreHandler(Replica replica) {
+ this.replica = replica;
+ }
+
+ boolean exportDocsFromCore()
+ throws IOException, SolrServerException {
+ HttpSolrClient client = new HttpSolrClient.Builder(baseurl).build();
+ try {
+ expectedDocs = getDocCount(replica.getCoreName(), client);
+ GenericSolrRequest request;
+ ModifiableSolrParams params = new ModifiableSolrParams();
+ params.add(Q, query);
+ if (fields != null) params.add(FL, fields);
+ params.add(SORT, uniqueKey + " asc");
+ params.add(CommonParams.DISTRIB, "false");
+ params.add(CommonParams.ROWS, "1000");
+ String cursorMark = CursorMarkParams.CURSOR_MARK_START;
+ Consumer<SolrDocument> wrapper = doc -> {
+ try {
+ queue.offer(doc, 10, TimeUnit.SECONDS);
+ receivedDocs.incrementAndGet();
+ } catch (InterruptedException e) {
+ failed = true;
+ if (output != null) output.println("Failed to write docs from" + e.getMessage());
+ }
+ };
+ StreamingBinaryResponseParser responseParser = new StreamingBinaryResponseParser(getStreamer(wrapper));
+ while (true) {
+ if (failed) return false;
+ if (docsWritten.get() > limit) return true;
+ params.set(CursorMarkParams.CURSOR_MARK_PARAM, cursorMark);
+ request = new GenericSolrRequest(SolrRequest.METHOD.GET,
+ "/" + replica.getCoreName() + "/select", params);
+ request.setResponseParser(responseParser);
+ try {
+ NamedList<Object> rsp = client.request(request);
+ String nextCursorMark = (String) rsp.get(CursorMarkParams.CURSOR_MARK_NEXT);
+ if (nextCursorMark == null || Objects.equals(cursorMark, nextCursorMark)) {
+ if (output != null)
+ output.println(StrUtils.formatString("\nExport complete for : {0}, docs : {1}", replica.getCoreName(), receivedDocs.get()));
+ if (expectedDocs != receivedDocs.get()) {
+ if (output != null) {
+ output.println(StrUtils.formatString("Could not download all docs for core {0} , expected: {1} , actual",
+ replica.getCoreName(), expectedDocs, receivedDocs));
+ return false;
+ }
+ }
+ return true;
+ }
+ cursorMark = nextCursorMark;
+ if (output != null) output.print(".");
+ } catch (SolrServerException e) {
+ if(output != null) output.println("Error reading from server "+ replica.getBaseUrl()+"/"+ replica.getCoreName());
+ failed = true;
+ return false;
+ }
+ }
+ } finally {
+ client.close();
+ }
+ }
}
}
+
+
+ static long getDocCount(String coreName, HttpSolrClient client) throws SolrServerException, IOException {
+ SolrQuery q = new SolrQuery("*:*");
+ q.setRows(0);
+ q.add("distrib", "false");
+ GenericSolrRequest request = new GenericSolrRequest(SolrRequest.METHOD.GET,
+ "/" + coreName + "/select", q);
+ NamedList<Object> res = client.request(request);
+ SolrDocumentList sdl = (SolrDocumentList) res.get("response");
+ return sdl.getNumFound();
+ }
}
diff --git a/solr/core/src/test/org/apache/solr/util/TestExportTool.java b/solr/core/src/test/org/apache/solr/util/TestExportTool.java
index 1fe5f66..fdfb3c0 100644
--- a/solr/core/src/test/org/apache/solr/util/TestExportTool.java
+++ b/solr/core/src/test/org/apache/solr/util/TestExportTool.java
@@ -19,14 +19,18 @@ package org.apache.solr.util;
import java.io.File;
import java.io.FileInputStream;
+import java.io.IOException;
import java.io.InputStreamReader;
import java.io.Reader;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
import org.apache.lucene.util.TestUtil;
import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.client.solrj.SolrQuery;
+import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.client.solrj.request.AbstractUpdateRequest;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.request.JavaBinUpdateRequestCodec;
@@ -34,6 +38,9 @@ import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.cloud.MiniSolrCloudCluster;
import org.apache.solr.cloud.SolrCloudTestCase;
+import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.util.FastInputStream;
import org.apache.solr.common.util.JsonRecordReader;
@@ -71,82 +78,155 @@ public class TestExportTool extends SolrCloudTestCase {
String url = cluster.getRandomJetty(random()).getBaseUrl() + "/" + COLLECTION_NAME;
- ExportTool.Info info = new ExportTool.Info(url);
-
+ ExportTool.Info info = new ExportTool.MultiThreadedRunner(url);
String absolutePath = tmpFileLoc + COLLECTION_NAME + random().nextInt(100000) + ".json";
info.setOutFormat(absolutePath, "jsonl");
info.setLimit("200");
- info.exportDocsWithCursorMark();
-
- assertTrue(info.docsWritten >= 200);
- JsonRecordReader jsonReader = JsonRecordReader.getInst("/", Arrays.asList("$FQN:/**"));
- Reader rdr = new InputStreamReader(new FileInputStream( absolutePath), StandardCharsets.UTF_8);
- try {
- int[] count = new int[]{0};
- jsonReader.streamRecords(rdr, (record, path) -> count[0]++);
- assertTrue(count[0] >= 200);
- } finally {
- rdr.close();
- }
+ info.fields = "id,desc_s";
+ info.exportDocs();
+ assertJsonDocsCount(info, 200);
- info = new ExportTool.Info(url);
+ info = new ExportTool.MultiThreadedRunner(url);
absolutePath = tmpFileLoc + COLLECTION_NAME + random().nextInt(100000) + ".json";
info.setOutFormat(absolutePath, "jsonl");
info.setLimit("-1");
- info.exportDocsWithCursorMark();
-
- assertTrue(info.docsWritten >= 1000);
- jsonReader = JsonRecordReader.getInst("/", Arrays.asList("$FQN:/**"));
- rdr = new InputStreamReader(new FileInputStream( absolutePath), StandardCharsets.UTF_8);
- try {
- int[] count = new int[]{0};
- jsonReader.streamRecords(rdr, (record, path) -> count[0]++);
- assertTrue(count[0] >= 1000);
- } finally {
- rdr.close();
- }
+ info.fields = "id,desc_s";
+ info.exportDocs();
+ assertJsonDocsCount(info, 1000);
- info = new ExportTool.Info(url);
+ info = new ExportTool.MultiThreadedRunner(url);
absolutePath = tmpFileLoc + COLLECTION_NAME + random().nextInt(100000) + ".javabin";
info.setOutFormat(absolutePath, "javabin");
info.setLimit("200");
- info.exportDocsWithCursorMark();
- assertTrue(info.docsWritten >= 200);
-
- FileInputStream fis = new FileInputStream(absolutePath);
- try {
- int[] count = new int[]{0};
- FastInputStream in = FastInputStream.wrap(fis);
- new JavaBinUpdateRequestCodec()
- .unmarshal(in, (document, req, commitWithin, override) -> count[0]++);
- assertTrue(count[0] >= 200);
- } finally {
- fis.close();
- }
+ info.fields = "id,desc_s";
+ info.exportDocs();
- info = new ExportTool.Info(url);
+ assertJavabinDocsCount(info, 200);
+
+ info = new ExportTool.MultiThreadedRunner(url);
absolutePath = tmpFileLoc + COLLECTION_NAME + random().nextInt(100000) + ".javabin";
info.setOutFormat(absolutePath, "javabin");
info.setLimit("-1");
- info.exportDocsWithCursorMark();
- assertTrue(info.docsWritten >= 1000);
-
- fis = new FileInputStream(absolutePath);
- try {
- int[] count = new int[]{0};
- FastInputStream in = FastInputStream.wrap(fis);
- new JavaBinUpdateRequestCodec()
- .unmarshal(in, (document, req, commitWithin, override) -> count[0]++);
- assertTrue(count[0] >= 1000);
- } finally {
- fis.close();
+ info.fields = "id,desc_s";
+ info.exportDocs();
+ assertJavabinDocsCount(info, 1000);
+
+ } finally {
+ cluster.shutdown();
+
+ }
+ }
+
+ @Nightly
+ public void testVeryLargeCluster() throws Exception {
+ String COLLECTION_NAME = "veryLargeColl";
+ MiniSolrCloudCluster cluster = configureCluster(4)
+ .addConfig("conf", configset("cloud-minimal"))
+ .configure();
+
+ try {
+ CollectionAdminRequest
+ .createCollection(COLLECTION_NAME, "conf", 8, 1)
+ .setMaxShardsPerNode(10)
+ .process(cluster.getSolrClient());
+ cluster.waitForActiveCollection(COLLECTION_NAME, 8, 8);
+
+ String tmpFileLoc = new File(cluster.getBaseDir().toFile().getAbsolutePath() +
+ File.separator).getPath();
+ String url = cluster.getRandomJetty(random()).getBaseUrl() + "/" + COLLECTION_NAME;
+
+
+ int docCount = 0;
+
+ for (int j = 0; j < 4; j++) {
+ int bsz = 10000;
+ UpdateRequest ur = new UpdateRequest();
+ ur.setAction(AbstractUpdateRequest.ACTION.COMMIT, true, true);
+ for (int i = 0; i < bsz; i++) {
+ ur.add("id", String.valueOf((j * bsz) + i), "desc_s", TestUtil.randomSimpleString(random(), 10, 50));
+ }
+ cluster.getSolrClient().request(ur, COLLECTION_NAME);
+ docCount += bsz;
}
+ QueryResponse qr = cluster.getSolrClient().query(COLLECTION_NAME, new SolrQuery("*:*").setRows(0));
+ assertEquals(docCount, qr.getResults().getNumFound());
+
+ DocCollection coll = cluster.getSolrClient().getClusterStateProvider().getCollection(COLLECTION_NAME);
+ HashMap<String, Long> docCounts = new HashMap<>();
+ long totalDocsFromCores = 0;
+ for (Slice slice : coll.getSlices()) {
+ Replica replica = slice.getLeader();
+ try (HttpSolrClient client = new HttpSolrClient.Builder(replica.getBaseUrl()).build()) {
+ long count = ExportTool.getDocCount(replica.getCoreName(), client);
+ docCounts.put(replica.getCoreName(), count);
+ totalDocsFromCores += count;
+ }
+ }
+ assertEquals(docCount, totalDocsFromCores);
+
+ ExportTool.MultiThreadedRunner info = null;
+ String absolutePath = null;
+
+ info = new ExportTool.MultiThreadedRunner(url);
+ info.output = System.out;
+ absolutePath = tmpFileLoc + COLLECTION_NAME + random().nextInt(100000) + ".javabin";
+ info.setOutFormat(absolutePath, "javabin");
+ info.setLimit("-1");
+ info.exportDocs();
+ assertJavabinDocsCount(info, docCount);
+ for (Map.Entry<String, Long> e : docCounts.entrySet()) {
+ assertEquals(e.getValue().longValue(), info.corehandlers.get(e.getKey()).receivedDocs.get());
+ }
+ info = new ExportTool.MultiThreadedRunner(url);
+ info.output = System.out;
+ absolutePath = tmpFileLoc + COLLECTION_NAME + random().nextInt(100000) + ".json";
+ info.setOutFormat(absolutePath, "jsonl");
+ info.fields = "id,desc_s";
+ info.setLimit("-1");
+ info.exportDocs();
+ long actual = ((ExportTool.JsonSink) info.sink).docs.get();
+ assertTrue("docs written :" + actual + "docs produced : " + info.docsWritten.get(), actual >= docCount);
+ assertJsonDocsCount(info, docCount);
} finally {
cluster.shutdown();
}
}
+
+
+ private void assertJavabinDocsCount(ExportTool.Info info, int expected) throws IOException {
+ assertTrue("" + info.docsWritten.get() + " expected " + expected, info.docsWritten.get() >= expected);
+ FileInputStream fis = new FileInputStream(info.out);
+ try {
+ int[] count = new int[]{0};
+ FastInputStream in = FastInputStream.wrap(fis);
+ new JavaBinUpdateRequestCodec()
+ .unmarshal(in, (document, req, commitWithin, override) -> {
+ assertEquals(2, document.size());
+ count[0]++;
+ });
+ assertTrue(count[0] >= expected);
+ } finally {
+ fis.close();
+ }
+ }
+
+ private void assertJsonDocsCount(ExportTool.Info info, int expected) throws IOException {
+ assertTrue("" + info.docsWritten.get() + " expected " + expected, info.docsWritten.get() >= expected);
+
+ JsonRecordReader jsonReader;
+ Reader rdr;
+ jsonReader = JsonRecordReader.getInst("/", Arrays.asList("$FQN:/**"));
+ rdr = new InputStreamReader(new FileInputStream(info.out), StandardCharsets.UTF_8);
+ try {
+ int[] count = new int[]{0};
+ jsonReader.streamRecords(rdr, (record, path) -> count[0]++);
+ assertTrue(count[0] >= expected);
+ } finally {
+ rdr.close();
+ }
+ }
}