You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by no...@apache.org on 2019/08/13 08:54:28 UTC

[lucene-solr] branch master updated: SOLR-13688: Run the bin/solr export command multithreaded

This is an automated email from the ASF dual-hosted git repository.

noble pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git


The following commit(s) were added to refs/heads/master by this push:
     new 07ca02b  SOLR-13688: Run the bin/solr export command multithreaded
07ca02b is described below

commit 07ca02b7375a9c2564aba4c905e880a32d16e1df
Author: noble <no...@apache.org>
AuthorDate: Tue Aug 13 18:54:05 2019 +1000

    SOLR-13688: Run the bin/solr export command multithreaded
---
 .../src/java/org/apache/solr/util/ExportTool.java  | 344 ++++++++++++++++-----
 .../test/org/apache/solr/util/TestExportTool.java  | 188 +++++++----
 2 files changed, 398 insertions(+), 134 deletions(-)

diff --git a/solr/core/src/java/org/apache/solr/util/ExportTool.java b/solr/core/src/java/org/apache/solr/util/ExportTool.java
index 7b5525e..ab29800 100644
--- a/solr/core/src/java/org/apache/solr/util/ExportTool.java
+++ b/solr/core/src/java/org/apache/solr/util/ExportTool.java
@@ -17,42 +17,65 @@
 
 package org.apache.solr.util;
 
+import java.io.BufferedOutputStream;
+import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
+import java.io.OutputStream;
 import java.io.OutputStreamWriter;
 import java.io.PrintStream;
+import java.io.Writer;
 import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
 
 import com.google.common.collect.ImmutableSet;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.Option;
 import org.apache.commons.cli.OptionBuilder;
-import org.apache.solr.client.solrj.SolrClient;
 import org.apache.solr.client.solrj.SolrQuery;
 import org.apache.solr.client.solrj.SolrRequest;
 import org.apache.solr.client.solrj.SolrServerException;
 import org.apache.solr.client.solrj.StreamingResponseCallback;
 import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.impl.ClusterStateProvider;
+import org.apache.solr.client.solrj.impl.HttpSolrClient;
 import org.apache.solr.client.solrj.impl.StreamingBinaryResponseParser;
 import org.apache.solr.client.solrj.request.GenericSolrRequest;
-import org.apache.solr.client.solrj.request.QueryRequest;
 import org.apache.solr.common.SolrDocument;
-import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.SolrDocumentList;
+import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.params.CommonParams;
 import org.apache.solr.common.params.CursorMarkParams;
 import org.apache.solr.common.params.MapSolrParams;
-import org.apache.solr.common.util.FastWriter;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.util.ExecutorUtil;
 import org.apache.solr.common.util.JavaBinCodec;
 import org.apache.solr.common.util.NamedList;
-import org.apache.solr.common.util.SolrJSONWriter;
+import org.apache.solr.common.util.StrUtils;
+import org.noggit.CharArr;
+import org.noggit.JSONWriter;
 
 import static org.apache.solr.common.params.CommonParams.FL;
 import static org.apache.solr.common.params.CommonParams.JAVABIN;
+import static org.apache.solr.common.params.CommonParams.Q;
+import static org.apache.solr.common.params.CommonParams.SORT;
+import static org.apache.solr.common.util.JavaBinCodec.SOLRINPUTDOC;
 
 public class ExportTool extends SolrCLI.ToolBase {
   @Override
@@ -65,7 +88,7 @@ public class ExportTool extends SolrCLI.ToolBase {
     return OPTIONS;
   }
 
-  public static class Info {
+  public static abstract class Info {
     String baseurl;
     String format;
     String query;
@@ -73,10 +96,12 @@ public class ExportTool extends SolrCLI.ToolBase {
     String out;
     String fields;
     long limit = 100;
-    long docsWritten = 0;
+    AtomicLong docsWritten = new AtomicLong(0);
+    int bufferSize = 1024 * 1024;
     PrintStream output;
-    //for testing purposes only
-    public SolrClient solrClient;
+    String uniqueKey;
+    CloudSolrClient solrClient;
+    DocsSink sink;
 
 
     public Info(String url) {
@@ -117,60 +142,24 @@ public class ExportTool extends SolrCLI.ToolBase {
       return JAVABIN.equals(format) ? new JavabinSink(this) : new JsonSink(this);
     }
 
-    void exportDocsWithCursorMark() throws SolrServerException, IOException {
-      DocsSink sink = getSink();
+    abstract void exportDocs() throws Exception;
+
+    void fetchUniqueKey() throws SolrServerException, IOException {
       solrClient = new CloudSolrClient.Builder(Collections.singletonList(baseurl)).build();
-      NamedList<Object> rsp1 = solrClient.request(new GenericSolrRequest(SolrRequest.METHOD.GET, "/schema/uniquekey",
+      NamedList<Object> response = solrClient.request(new GenericSolrRequest(SolrRequest.METHOD.GET, "/schema/uniquekey",
           new MapSolrParams(Collections.singletonMap("collection", coll))));
-      String uniqueKey = (String) rsp1.get("uniqueKey");
-
-      sink.start();
-      try {
-        NamedList<Object> rsp;
-        SolrQuery q = (new SolrQuery(query))
-            .setParam("collection", coll)
-            .setRows(100)
-            .setSort(SolrQuery.SortClause.asc(uniqueKey));
-        if (fields != null) {
-          q.setParam(FL, fields);
-        }
-
-        String cursorMark = CursorMarkParams.CURSOR_MARK_START;
-        boolean done = false;
-        StreamingResponseCallback streamer = getStreamer(sink);
-
-        if(output!= null) output.println("Exporting data to : "+ out);
-        while (!done) {
-          if (docsWritten >= limit) break;
-          QueryRequest request = new QueryRequest(q);
-          request.setResponseParser(new StreamingBinaryResponseParser(streamer));
-          q.set(CursorMarkParams.CURSOR_MARK_PARAM, cursorMark);
-          rsp = solrClient.request(request);
-          String nextCursorMark = (String) rsp.get(CursorMarkParams.CURSOR_MARK_NEXT);
-          if (nextCursorMark == null || Objects.equals(cursorMark, nextCursorMark)) {
-            break;
-          }
-          cursorMark = nextCursorMark;
-          if(output!= null) output.print(".");
-        }
-        if(output!= null) output.println("\n DONE!");
-      } finally {
-        sink.end();
-        solrClient.close();
-
-      }
+      uniqueKey = (String) response.get("uniqueKey");
     }
 
-    private StreamingResponseCallback getStreamer(DocsSink sink) {
+    public static StreamingResponseCallback getStreamer(Consumer<SolrDocument> sink) {
       return new StreamingResponseCallback() {
         @Override
         public void streamSolrDocument(SolrDocument doc) {
           try {
             sink.accept(doc);
-          } catch (IOException e) {
+          } catch (Exception e) {
             throw new RuntimeException(e);
           }
-          docsWritten++;
         }
 
         @Override
@@ -186,21 +175,24 @@ public class ExportTool extends SolrCLI.ToolBase {
 
   @Override
   protected void runImpl(CommandLine cli) throws Exception {
-    Info info = new Info(cli.getOptionValue("url"));
+    String url = cli.getOptionValue("url");
+    Info info = new MultiThreadedRunner(url);
     info.query = cli.getOptionValue("query", "*:*");
     info.setOutFormat(cli.getOptionValue("out"), cli.getOptionValue("format"));
     info.fields = cli.getOptionValue("fields");
     info.setLimit(cli.getOptionValue("limit", "100"));
     info.output = super.stdout;
-    info.exportDocsWithCursorMark();
+    info.exportDocs();
   }
 
   interface DocsSink {
-    void start() throws IOException;
+    default void start() throws IOException {
+    }
 
-    void accept(SolrDocument document) throws IOException;
+    void accept(SolrDocument document) throws IOException, InterruptedException;
 
-    void end() throws IOException;
+    default void end() throws IOException {
+    }
   }
 
   private static final Option[] OPTIONS = {
@@ -236,11 +228,13 @@ public class ExportTool extends SolrCLI.ToolBase {
           .create("fields")
   };
 
-  private static class JsonSink implements DocsSink {
+  static class JsonSink implements DocsSink {
     private final Info info;
-    private SolrJSONWriter jsonw;
-    private FastWriter writer;
-    private FileOutputStream fos;
+    private CharArr charArr = new CharArr(1024 * 2);
+    JSONWriter jsonWriter = new JSONWriter(charArr, -1);
+    private Writer writer;
+    private OutputStream fos;
+    public AtomicLong docs = new AtomicLong();
 
     public JsonSink(Info info) {
       this.info = info;
@@ -249,24 +243,27 @@ public class ExportTool extends SolrCLI.ToolBase {
     @Override
     public void start() throws IOException {
       fos = new FileOutputStream(info.out);
-      writer = FastWriter.wrap(new OutputStreamWriter(fos, StandardCharsets.UTF_8));
-      jsonw = new SolrJSONWriter(writer);
-      jsonw.setIndent(false);
+      if (info.bufferSize > 0) {
+        fos = new BufferedOutputStream(fos, info.bufferSize);
+      }
+      writer = new OutputStreamWriter(fos, StandardCharsets.UTF_8);
 
     }
 
     @Override
     public void end() throws IOException {
-      jsonw.close();
+      writer.flush();
+      fos.flush();
       fos.close();
-
     }
 
     @Override
-    public void accept(SolrDocument doc) throws IOException {
+    public synchronized void accept(SolrDocument doc) throws IOException {
+      docs.incrementAndGet();
+      charArr.reset();
       Map m = new LinkedHashMap(doc.size());
       doc.forEach((s, field) -> {
-        if (s.equals("_version_")) return;
+        if (s.equals("_version_") || s.equals("_roor_")) return;
         if (field instanceof List) {
           if (((List) field).size() == 1) {
             field = ((List) field).get(0);
@@ -274,17 +271,16 @@ public class ExportTool extends SolrCLI.ToolBase {
         }
         m.put(s, field);
       });
-      jsonw.writeObj(m);
-      writer.flush();
+      jsonWriter.write(m);
+      writer.write(charArr.getArray(), charArr.getStart(), charArr.getEnd());
       writer.append('\n');
-
     }
   }
 
   private static class JavabinSink implements DocsSink {
     private final Info info;
     JavaBinCodec codec;
-    FileOutputStream fos;
+    OutputStream fos;
 
     public JavabinSink(Info info) {
       this.info = info;
@@ -293,6 +289,9 @@ public class ExportTool extends SolrCLI.ToolBase {
     @Override
     public void start() throws IOException {
       fos = new FileOutputStream(info.out);
+      if (info.bufferSize > 0) {
+        fos = new BufferedOutputStream(fos, info.bufferSize);
+      }
       codec = new JavaBinCodec(fos, null);
       codec.writeTag(JavaBinCodec.NAMED_LST, 2);
       codec.writeStr("params");
@@ -306,23 +305,208 @@ public class ExportTool extends SolrCLI.ToolBase {
     public void end() throws IOException {
       codec.writeTag(JavaBinCodec.END);
       codec.close();
+      fos.flush();
       fos.close();
 
     }
+    private BiConsumer<String, Object> bic= new BiConsumer<>() {
+      @Override
+      public void accept(String s, Object o) {
+        try {
+          if (s.equals("_version_") || s.equals("_root_")) return;
+          codec.writeExternString(s);
+          codec.writeVal(o);
+        } catch (IOException e) {
+          throw new RuntimeException(e);
+        }
+      }
+    };
 
     @Override
-    public void accept(SolrDocument doc) throws IOException {
-      SolrInputDocument document = new SolrInputDocument();
-      doc.forEach((s, o) -> {
-        if (s.equals("_version_")) return;
-        if (o instanceof List) {
-          if (((List) o).size() == 1) o = ((List) o).get(0);
+    public synchronized void accept(SolrDocument doc) throws IOException {
+      int sz = doc.size();
+      if(doc.containsKey("_version_")) sz--;
+      if(doc.containsKey("_root_")) sz--;
+      codec.writeTag(SOLRINPUTDOC, sz);
+      codec.writeFloat(1f); // document boost
+      doc.forEach(bic);
+    }
+  }
+
+  static class MultiThreadedRunner extends Info {
+    ExecutorService producerThreadpool, consumerThreadpool;
+    ArrayBlockingQueue<SolrDocument> queue = new ArrayBlockingQueue(1000);
+    SolrDocument EOFDOC = new SolrDocument();
+    volatile boolean failed = false;
+    Map<String, CoreHandler> corehandlers = new HashMap();
+
+    public MultiThreadedRunner(String url) {
+      super(url);
+    }
+
+
+    @Override
+    void exportDocs() throws Exception {
+      sink = getSink();
+      fetchUniqueKey();
+      ClusterStateProvider stateProvider = solrClient.getClusterStateProvider();
+      DocCollection coll = stateProvider.getCollection(this.coll);
+      Map<String, Slice> m = coll.getSlicesMap();
+      producerThreadpool = ExecutorUtil.newMDCAwareFixedThreadPool(m.size(),
+          new DefaultSolrThreadFactory("solrcli-exporter-producers"));
+      consumerThreadpool = ExecutorUtil.newMDCAwareFixedThreadPool(1,
+          new DefaultSolrThreadFactory("solrcli-exporter-consumer"));
+      sink.start();
+      CountDownLatch consumerlatch = new CountDownLatch(1);
+      try {
+        addConsumer(consumerlatch);
+        addProducers(m);
+        if (output != null) {
+          output.println("NO of shards : " + corehandlers.size());
         }
-        document.addField(s, o);
+        CountDownLatch producerLatch = new CountDownLatch(corehandlers.size());
+        corehandlers.forEach((s, coreHandler) -> producerThreadpool.submit(() -> {
+          try {
+            coreHandler.exportDocsFromCore();
+          } catch (Exception e) {
+            if(output != null) output.println("Error exporting docs from : "+s);
+
+          }
+          producerLatch.countDown();
+        }));
+
+        producerLatch.await();
+        queue.offer(EOFDOC, 10, TimeUnit.SECONDS);
+        consumerlatch.await();
+      } finally {
+        sink.end();
+        solrClient.close();
+        producerThreadpool.shutdownNow();
+        consumerThreadpool.shutdownNow();
+        if (failed) {
+          try {
+            Files.delete(new File(out).toPath());
+          } catch (IOException e) {
+            //ignore
+          }
+        }
+      }
+    }
+
+    private void addProducers(Map<String, Slice> m) {
+      for (Map.Entry<String, Slice> entry : m.entrySet()) {
+        Slice slice = entry.getValue();
+        Replica replica = slice.getLeader();
+        if (replica == null) replica = slice.getReplicas().iterator().next();// get a random replica
+        CoreHandler coreHandler = new CoreHandler(replica);
+        corehandlers.put(replica.getCoreName(), coreHandler);
+      }
+    }
+
+    private void addConsumer(CountDownLatch consumerlatch) {
+      consumerThreadpool.submit(() -> {
+        while (true) {
+          SolrDocument doc = null;
+          try {
+            doc = queue.poll(30, TimeUnit.SECONDS);
+          } catch (InterruptedException e) {
+            if (output != null) output.println("Consumer interrupted");
+            failed = true;
+            break;
+          }
+          if (doc == EOFDOC) break;
+          try {
+            if (docsWritten.get() > limit) continue;
+            sink.accept(doc);
+            docsWritten.incrementAndGet();
+          } catch (Exception e) {
+            if (output != null) output.println("Failed to write to file " + e.getMessage());
+            failed = true;
+          }
+        }
+        consumerlatch.countDown();
       });
+    }
+
 
-      codec.writeSolrInputDocument(document);
+    class CoreHandler {
+      final Replica replica;
+      long expectedDocs;
+      AtomicLong receivedDocs = new AtomicLong();
 
+      CoreHandler(Replica replica) {
+        this.replica = replica;
+      }
+
+      boolean exportDocsFromCore()
+          throws IOException, SolrServerException {
+        HttpSolrClient client = new HttpSolrClient.Builder(baseurl).build();
+        try {
+          expectedDocs = getDocCount(replica.getCoreName(), client);
+          GenericSolrRequest request;
+          ModifiableSolrParams params = new ModifiableSolrParams();
+          params.add(Q, query);
+          if (fields != null) params.add(FL, fields);
+          params.add(SORT, uniqueKey + " asc");
+          params.add(CommonParams.DISTRIB, "false");
+          params.add(CommonParams.ROWS, "1000");
+          String cursorMark = CursorMarkParams.CURSOR_MARK_START;
+          Consumer<SolrDocument> wrapper = doc -> {
+            try {
+              queue.offer(doc, 10, TimeUnit.SECONDS);
+              receivedDocs.incrementAndGet();
+            } catch (InterruptedException e) {
+              failed = true;
+              if (output != null) output.println("Failed to write docs from" + e.getMessage());
+            }
+          };
+          StreamingBinaryResponseParser responseParser = new StreamingBinaryResponseParser(getStreamer(wrapper));
+          while (true) {
+            if (failed) return false;
+            if (docsWritten.get() > limit) return true;
+            params.set(CursorMarkParams.CURSOR_MARK_PARAM, cursorMark);
+            request = new GenericSolrRequest(SolrRequest.METHOD.GET,
+                "/" + replica.getCoreName() + "/select", params);
+            request.setResponseParser(responseParser);
+            try {
+              NamedList<Object> rsp = client.request(request);
+              String nextCursorMark = (String) rsp.get(CursorMarkParams.CURSOR_MARK_NEXT);
+              if (nextCursorMark == null || Objects.equals(cursorMark, nextCursorMark)) {
+                if (output != null)
+                  output.println(StrUtils.formatString("\nExport complete for : {0}, docs : {1}", replica.getCoreName(), receivedDocs.get()));
+                if (expectedDocs != receivedDocs.get()) {
+                  if (output != null) {
+                    output.println(StrUtils.formatString("Could not download all docs for core {0} , expected: {1} , actual",
+                        replica.getCoreName(), expectedDocs, receivedDocs));
+                    return false;
+                  }
+                }
+                return true;
+              }
+              cursorMark = nextCursorMark;
+              if (output != null) output.print(".");
+            } catch (SolrServerException e) {
+              if(output != null) output.println("Error reading from server "+ replica.getBaseUrl()+"/"+ replica.getCoreName());
+              failed = true;
+              return false;
+            }
+          }
+        } finally {
+          client.close();
+        }
+      }
     }
   }
+
+
+  static long getDocCount(String coreName, HttpSolrClient client) throws SolrServerException, IOException {
+    SolrQuery q = new SolrQuery("*:*");
+    q.setRows(0);
+    q.add("distrib", "false");
+    GenericSolrRequest request = new GenericSolrRequest(SolrRequest.METHOD.GET,
+        "/" + coreName + "/select", q);
+    NamedList<Object> res = client.request(request);
+    SolrDocumentList sdl = (SolrDocumentList) res.get("response");
+    return sdl.getNumFound();
+  }
 }
diff --git a/solr/core/src/test/org/apache/solr/util/TestExportTool.java b/solr/core/src/test/org/apache/solr/util/TestExportTool.java
index 1fe5f66..fdfb3c0 100644
--- a/solr/core/src/test/org/apache/solr/util/TestExportTool.java
+++ b/solr/core/src/test/org/apache/solr/util/TestExportTool.java
@@ -19,14 +19,18 @@ package org.apache.solr.util;
 
 import java.io.File;
 import java.io.FileInputStream;
+import java.io.IOException;
 import java.io.InputStreamReader;
 import java.io.Reader;
 import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
 
 import org.apache.lucene.util.TestUtil;
 import org.apache.solr.SolrTestCaseJ4;
 import org.apache.solr.client.solrj.SolrQuery;
+import org.apache.solr.client.solrj.impl.HttpSolrClient;
 import org.apache.solr.client.solrj.request.AbstractUpdateRequest;
 import org.apache.solr.client.solrj.request.CollectionAdminRequest;
 import org.apache.solr.client.solrj.request.JavaBinUpdateRequestCodec;
@@ -34,6 +38,9 @@ import org.apache.solr.client.solrj.request.UpdateRequest;
 import org.apache.solr.client.solrj.response.QueryResponse;
 import org.apache.solr.cloud.MiniSolrCloudCluster;
 import org.apache.solr.cloud.SolrCloudTestCase;
+import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.Slice;
 import org.apache.solr.common.util.FastInputStream;
 import org.apache.solr.common.util.JsonRecordReader;
 
@@ -71,82 +78,155 @@ public class TestExportTool extends SolrCloudTestCase {
       String url = cluster.getRandomJetty(random()).getBaseUrl() + "/" + COLLECTION_NAME;
 
 
-      ExportTool.Info info = new ExportTool.Info(url);
-
+      ExportTool.Info info = new ExportTool.MultiThreadedRunner(url);
       String absolutePath = tmpFileLoc + COLLECTION_NAME + random().nextInt(100000) + ".json";
       info.setOutFormat(absolutePath, "jsonl");
       info.setLimit("200");
-      info.exportDocsWithCursorMark();
-
-      assertTrue(info.docsWritten >= 200);
-      JsonRecordReader jsonReader = JsonRecordReader.getInst("/", Arrays.asList("$FQN:/**"));
-      Reader rdr = new InputStreamReader(new FileInputStream( absolutePath), StandardCharsets.UTF_8);
-      try {
-        int[] count = new int[]{0};
-        jsonReader.streamRecords(rdr, (record, path) -> count[0]++);
-        assertTrue(count[0] >= 200);
-      } finally {
-        rdr.close();
-      }
+      info.fields = "id,desc_s";
+      info.exportDocs();
 
+      assertJsonDocsCount(info, 200);
 
-      info = new ExportTool.Info(url);
+      info = new ExportTool.MultiThreadedRunner(url);
       absolutePath = tmpFileLoc + COLLECTION_NAME + random().nextInt(100000) + ".json";
       info.setOutFormat(absolutePath, "jsonl");
       info.setLimit("-1");
-      info.exportDocsWithCursorMark();
-
-      assertTrue(info.docsWritten >= 1000);
-      jsonReader = JsonRecordReader.getInst("/", Arrays.asList("$FQN:/**"));
-      rdr = new InputStreamReader(new FileInputStream( absolutePath), StandardCharsets.UTF_8);
-      try {
-        int[] count = new int[]{0};
-        jsonReader.streamRecords(rdr, (record, path) -> count[0]++);
-        assertTrue(count[0] >= 1000);
-      } finally {
-        rdr.close();
-      }
+      info.fields = "id,desc_s";
+      info.exportDocs();
 
+      assertJsonDocsCount(info, 1000);
 
-      info = new ExportTool.Info(url);
+      info = new ExportTool.MultiThreadedRunner(url);
       absolutePath = tmpFileLoc + COLLECTION_NAME + random().nextInt(100000) + ".javabin";
       info.setOutFormat(absolutePath, "javabin");
       info.setLimit("200");
-      info.exportDocsWithCursorMark();
-      assertTrue(info.docsWritten >= 200);
-
-      FileInputStream fis = new FileInputStream(absolutePath);
-      try {
-        int[] count = new int[]{0};
-        FastInputStream in = FastInputStream.wrap(fis);
-        new JavaBinUpdateRequestCodec()
-            .unmarshal(in, (document, req, commitWithin, override) -> count[0]++);
-        assertTrue(count[0] >= 200);
-      } finally {
-        fis.close();
-      }
+      info.fields = "id,desc_s";
+      info.exportDocs();
 
-      info = new ExportTool.Info(url);
+      assertJavabinDocsCount(info, 200);
+
+      info = new ExportTool.MultiThreadedRunner(url);
       absolutePath = tmpFileLoc + COLLECTION_NAME + random().nextInt(100000) + ".javabin";
       info.setOutFormat(absolutePath, "javabin");
       info.setLimit("-1");
-      info.exportDocsWithCursorMark();
-      assertTrue(info.docsWritten >= 1000);
-
-      fis = new FileInputStream(absolutePath);
-      try {
-        int[] count = new int[]{0};
-        FastInputStream in = FastInputStream.wrap(fis);
-        new JavaBinUpdateRequestCodec()
-            .unmarshal(in, (document, req, commitWithin, override) -> count[0]++);
-        assertTrue(count[0] >= 1000);
-      } finally {
-        fis.close();
+      info.fields = "id,desc_s";
+      info.exportDocs();
+      assertJavabinDocsCount(info, 1000);
+
+    } finally {
+      cluster.shutdown();
+
+    }
+  }
+
+  @Nightly
+  public void testVeryLargeCluster() throws Exception {
+    String COLLECTION_NAME = "veryLargeColl";
+    MiniSolrCloudCluster cluster = configureCluster(4)
+        .addConfig("conf", configset("cloud-minimal"))
+        .configure();
+
+    try {
+      CollectionAdminRequest
+          .createCollection(COLLECTION_NAME, "conf", 8, 1)
+          .setMaxShardsPerNode(10)
+          .process(cluster.getSolrClient());
+      cluster.waitForActiveCollection(COLLECTION_NAME, 8, 8);
+
+      String tmpFileLoc = new File(cluster.getBaseDir().toFile().getAbsolutePath() +
+          File.separator).getPath();
+      String url = cluster.getRandomJetty(random()).getBaseUrl() + "/" + COLLECTION_NAME;
+
+
+      int docCount = 0;
+
+      for (int j = 0; j < 4; j++) {
+        int bsz = 10000;
+        UpdateRequest ur = new UpdateRequest();
+        ur.setAction(AbstractUpdateRequest.ACTION.COMMIT, true, true);
+        for (int i = 0; i < bsz; i++) {
+          ur.add("id", String.valueOf((j * bsz) + i), "desc_s", TestUtil.randomSimpleString(random(), 10, 50));
+        }
+        cluster.getSolrClient().request(ur, COLLECTION_NAME);
+        docCount += bsz;
       }
 
+      QueryResponse qr = cluster.getSolrClient().query(COLLECTION_NAME, new SolrQuery("*:*").setRows(0));
+      assertEquals(docCount, qr.getResults().getNumFound());
+
+      DocCollection coll = cluster.getSolrClient().getClusterStateProvider().getCollection(COLLECTION_NAME);
+      HashMap<String, Long> docCounts = new HashMap<>();
+      long totalDocsFromCores = 0;
+      for (Slice slice : coll.getSlices()) {
+        Replica replica = slice.getLeader();
+        try (HttpSolrClient client = new HttpSolrClient.Builder(replica.getBaseUrl()).build()) {
+          long count = ExportTool.getDocCount(replica.getCoreName(), client);
+          docCounts.put(replica.getCoreName(), count);
+          totalDocsFromCores += count;
+        }
+      }
+      assertEquals(docCount, totalDocsFromCores);
+
+      ExportTool.MultiThreadedRunner info = null;
+      String absolutePath = null;
+
+      info = new ExportTool.MultiThreadedRunner(url);
+      info.output = System.out;
+      absolutePath = tmpFileLoc + COLLECTION_NAME + random().nextInt(100000) + ".javabin";
+      info.setOutFormat(absolutePath, "javabin");
+      info.setLimit("-1");
+      info.exportDocs();
+      assertJavabinDocsCount(info, docCount);
+      for (Map.Entry<String, Long> e : docCounts.entrySet()) {
+        assertEquals(e.getValue().longValue(), info.corehandlers.get(e.getKey()).receivedDocs.get());
+      }
+      info = new ExportTool.MultiThreadedRunner(url);
+      info.output = System.out;
+      absolutePath = tmpFileLoc + COLLECTION_NAME + random().nextInt(100000) + ".json";
+      info.setOutFormat(absolutePath, "jsonl");
+      info.fields = "id,desc_s";
+      info.setLimit("-1");
+      info.exportDocs();
+      long actual = ((ExportTool.JsonSink) info.sink).docs.get();
+      assertTrue("docs written :" + actual + "docs produced : " + info.docsWritten.get(), actual >= docCount);
+      assertJsonDocsCount(info, docCount);
     } finally {
       cluster.shutdown();
 
     }
   }
+
+
+  private void assertJavabinDocsCount(ExportTool.Info info, int expected) throws IOException {
+    assertTrue("" + info.docsWritten.get() + " expected " + expected, info.docsWritten.get() >= expected);
+    FileInputStream fis = new FileInputStream(info.out);
+    try {
+      int[] count = new int[]{0};
+      FastInputStream in = FastInputStream.wrap(fis);
+      new JavaBinUpdateRequestCodec()
+          .unmarshal(in, (document, req, commitWithin, override) -> {
+            assertEquals(2, document.size());
+            count[0]++;
+          });
+      assertTrue(count[0] >= expected);
+    } finally {
+      fis.close();
+    }
+  }
+
+  private void assertJsonDocsCount(ExportTool.Info info, int expected) throws IOException {
+    assertTrue("" + info.docsWritten.get() + " expected " + expected, info.docsWritten.get() >= expected);
+
+    JsonRecordReader jsonReader;
+    Reader rdr;
+    jsonReader = JsonRecordReader.getInst("/", Arrays.asList("$FQN:/**"));
+    rdr = new InputStreamReader(new FileInputStream(info.out), StandardCharsets.UTF_8);
+    try {
+      int[] count = new int[]{0};
+      jsonReader.streamRecords(rdr, (record, path) -> count[0]++);
+      assertTrue(count[0] >= expected);
+    } finally {
+      rdr.close();
+    }
+  }
 }