You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@ozone.apache.org by "xichen01 (via GitHub)" <gi...@apache.org> on 2023/09/26 12:40:52 UTC

[GitHub] [ozone] xichen01 commented on a diff in pull request #5358: HDDS-9348. Improving parsing speed of DB Scanner

xichen01 commented on code in PR #5358:
URL: https://github.com/apache/ozone/pull/5358#discussion_r1337139269


##########
hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/DBScanner.java:
##########
@@ -364,4 +416,252 @@ private String removeTrailingSlashIfNeeded(String dbPath) {
   public Class<?> getParentType() {
     return RDBParser.class;
   }
+
+  /**
+   * Utility for centralized JSON serialization using Jackson.
+   */
+  @VisibleForTesting
+  public static class JsonSerializationHelper {
+    /**
+     * In order to maintain consistency with the original Gson output to do
+     * this setup makes the output from Jackson closely match the
+     * output of Gson.
+     */
+    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper()
+        .registerModule(new JavaTimeModule())
+        .setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY)
+        // Ignore standard getters.
+        .setVisibility(PropertyAccessor.GETTER, JsonAutoDetect.Visibility.NONE)
+        // Ignore boolean "is" getters.
+        .setVisibility(PropertyAccessor.IS_GETTER,
+            JsonAutoDetect.Visibility.NONE)
+        // Exclude null values.
+        .setSerializationInclusion(JsonInclude.Include.NON_NULL);
+    public static final ObjectWriter WRITER;
+
+    static {
+      if (compact) {
+        WRITER = OBJECT_MAPPER.writer();
+      } else {
+        WRITER = OBJECT_MAPPER.writerWithDefaultPrettyPrinter();
+      }
+    }
+
+    public static ObjectWriter getWriter() {
+      return WRITER;
+    }
+  }
+
+
+  private static class Task implements Callable<Void> {
+
+    private final DBColumnFamilyDefinition dbColumnFamilyDefinition;
+    private final ArrayList<ByteArrayKeyValue> batch;
+    private final LogWriter logWriter;
+    private static final ObjectWriter WRITER =
+        JsonSerializationHelper.getWriter();
+    private final long sequenceId;
+    private final boolean withKey;
+    private final boolean schemaV3;
+
+    Task(DBColumnFamilyDefinition dbColumnFamilyDefinition,
+         ArrayList<ByteArrayKeyValue> batch, LogWriter logWriter,
+         long sequenceId, boolean withKey, boolean schemaV3) {
+      this.dbColumnFamilyDefinition = dbColumnFamilyDefinition;
+      this.batch = batch;
+      this.logWriter = logWriter;
+      this.sequenceId = sequenceId;
+      this.withKey = withKey;
+      this.schemaV3 = schemaV3;
+    }
+
+    @Override
+    public Void call() {
+      try {
+        ArrayList<String> results = new ArrayList<>(batch.size());
+        for (ByteArrayKeyValue byteArrayKeyValue : batch) {
+          StringBuilder sb = new StringBuilder();
+          if (sequenceId == FIRST_SEQUENCE_ID && !results.isEmpty()) {

Review Comment:
   This have been fixed.



##########
hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/DBScanner.java:
##########
@@ -176,88 +202,114 @@ private boolean displayTable(ManagedRocksIterator iterator,
                                boolean schemaV3)
       throws IOException {
 
-    if (fileName == null) {
-      // Print to stdout
-      return displayTable(iterator, dbColumnFamilyDef, out(), schemaV3);
-    }
+    ThreadFactory factory = new ThreadFactoryBuilder()
+        .setNameFormat("DBScanner-%d")
+        .build();
+    ExecutorService threadPool = new ThreadPoolExecutor(

Review Comment:
   Done.



##########
hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/DBScanner.java:
##########
@@ -176,88 +202,114 @@ private boolean displayTable(ManagedRocksIterator iterator,
                                boolean schemaV3)
       throws IOException {
 
-    if (fileName == null) {
-      // Print to stdout
-      return displayTable(iterator, dbColumnFamilyDef, out(), schemaV3);
-    }
+    ThreadFactory factory = new ThreadFactoryBuilder()
+        .setNameFormat("DBScanner-%d")
+        .build();
+    ExecutorService threadPool = new ThreadPoolExecutor(
+        THREAD_COUNT, THREAD_COUNT, 60, TimeUnit.SECONDS,
+        new LinkedBlockingQueue<>(), factory,
+        new ThreadPoolExecutor.CallerRunsPolicy());
 
-    // Write to file output
-    try (PrintWriter out = new PrintWriter(fileName, UTF_8.name())) {
-      return displayTable(iterator, dbColumnFamilyDef, out, schemaV3);
+    PrintWriter printWriter = null;
+    try {
+      if (fileName != null) {
+        printWriter = new PrintWriter(
+            new BufferedWriter(new PrintWriter(fileName, UTF_8.name())));
+      } else {
+        printWriter = out();
+      }
+      return displayTable(iterator, dbColumnFamilyDef, printWriter, threadPool,
+          schemaV3);
+    } finally {
+      if (printWriter != null) {
+        printWriter.close();
+      }
     }
   }
 
   private boolean displayTable(ManagedRocksIterator iterator,
                                DBColumnFamilyDefinition dbColumnFamilyDef,
-                               PrintWriter out,
-                               boolean schemaV3)
-      throws IOException {
+                               PrintWriter printWriter,
+                               ExecutorService threadPool, boolean schemaV3) {
+    LogWriter logWriter = new LogWriter(printWriter);
+    try {
+      // Start JSON object (map) or array
+      printWriter.print(withKey ? "{ " : "[ ");
+      logWriter.start();
+      processRecords(iterator, dbColumnFamilyDef, logWriter,
+          threadPool, schemaV3);
+    } catch (InterruptedException e) {
+      exception = true;
+      Thread.currentThread().interrupt();
+    } finally {
+      threadPool.shutdown();
+      try {
+        if (!threadPool.awaitTermination(60, TimeUnit.SECONDS)) {

Review Comment:
   Change to directly shutdown (`threadPool.shutdownNow`).



##########
hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/DBScanner.java:
##########
@@ -176,88 +202,114 @@ private boolean displayTable(ManagedRocksIterator iterator,
                                boolean schemaV3)
       throws IOException {
 
-    if (fileName == null) {
-      // Print to stdout
-      return displayTable(iterator, dbColumnFamilyDef, out(), schemaV3);
-    }
+    ThreadFactory factory = new ThreadFactoryBuilder()
+        .setNameFormat("DBScanner-%d")
+        .build();
+    ExecutorService threadPool = new ThreadPoolExecutor(
+        THREAD_COUNT, THREAD_COUNT, 60, TimeUnit.SECONDS,
+        new LinkedBlockingQueue<>(), factory,
+        new ThreadPoolExecutor.CallerRunsPolicy());
 
-    // Write to file output
-    try (PrintWriter out = new PrintWriter(fileName, UTF_8.name())) {
-      return displayTable(iterator, dbColumnFamilyDef, out, schemaV3);
+    PrintWriter printWriter = null;
+    try {
+      if (fileName != null) {
+        printWriter = new PrintWriter(
+            new BufferedWriter(new PrintWriter(fileName, UTF_8.name())));
+      } else {
+        printWriter = out();
+      }
+      return displayTable(iterator, dbColumnFamilyDef, printWriter, threadPool,
+          schemaV3);
+    } finally {
+      if (printWriter != null) {
+        printWriter.close();
+      }
     }
   }
 
   private boolean displayTable(ManagedRocksIterator iterator,
                                DBColumnFamilyDefinition dbColumnFamilyDef,
-                               PrintWriter out,
-                               boolean schemaV3)
-      throws IOException {
+                               PrintWriter printWriter,
+                               ExecutorService threadPool, boolean schemaV3) {
+    LogWriter logWriter = new LogWriter(printWriter);
+    try {
+      // Start JSON object (map) or array
+      printWriter.print(withKey ? "{ " : "[ ");
+      logWriter.start();
+      processRecords(iterator, dbColumnFamilyDef, logWriter,
+          threadPool, schemaV3);
+    } catch (InterruptedException e) {
+      exception = true;
+      Thread.currentThread().interrupt();
+    } finally {
+      threadPool.shutdown();
+      try {
+        if (!threadPool.awaitTermination(60, TimeUnit.SECONDS)) {
+          threadPool.shutdownNow();
+          err().println("Thread pool did not terminate; forced shutdown.");
+        }
+      } catch (InterruptedException ie) {
+        threadPool.shutdownNow();
+        Thread.currentThread().interrupt();
+      }
+      logWriter.stop();

Review Comment:
   Done.



##########
hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/DBScanner.java:
##########
@@ -176,88 +202,114 @@ private boolean displayTable(ManagedRocksIterator iterator,
                                boolean schemaV3)
       throws IOException {
 
-    if (fileName == null) {
-      // Print to stdout
-      return displayTable(iterator, dbColumnFamilyDef, out(), schemaV3);
-    }
+    ThreadFactory factory = new ThreadFactoryBuilder()
+        .setNameFormat("DBScanner-%d")
+        .build();
+    ExecutorService threadPool = new ThreadPoolExecutor(
+        THREAD_COUNT, THREAD_COUNT, 60, TimeUnit.SECONDS,
+        new LinkedBlockingQueue<>(), factory,
+        new ThreadPoolExecutor.CallerRunsPolicy());
 
-    // Write to file output
-    try (PrintWriter out = new PrintWriter(fileName, UTF_8.name())) {
-      return displayTable(iterator, dbColumnFamilyDef, out, schemaV3);
+    PrintWriter printWriter = null;
+    try {
+      if (fileName != null) {
+        printWriter = new PrintWriter(
+            new BufferedWriter(new PrintWriter(fileName, UTF_8.name())));
+      } else {
+        printWriter = out();
+      }
+      return displayTable(iterator, dbColumnFamilyDef, printWriter, threadPool,
+          schemaV3);
+    } finally {
+      if (printWriter != null) {
+        printWriter.close();
+      }
     }
   }
 
   private boolean displayTable(ManagedRocksIterator iterator,
                                DBColumnFamilyDefinition dbColumnFamilyDef,
-                               PrintWriter out,
-                               boolean schemaV3)
-      throws IOException {
+                               PrintWriter printWriter,
+                               ExecutorService threadPool, boolean schemaV3) {
+    LogWriter logWriter = new LogWriter(printWriter);
+    try {
+      // Start JSON object (map) or array
+      printWriter.print(withKey ? "{ " : "[ ");
+      logWriter.start();
+      processRecords(iterator, dbColumnFamilyDef, logWriter,
+          threadPool, schemaV3);
+    } catch (InterruptedException e) {
+      exception = true;
+      Thread.currentThread().interrupt();
+    } finally {
+      threadPool.shutdown();
+      try {
+        if (!threadPool.awaitTermination(60, TimeUnit.SECONDS)) {
+          threadPool.shutdownNow();
+          err().println("Thread pool did not terminate; forced shutdown.");
+        }
+      } catch (InterruptedException ie) {
+        threadPool.shutdownNow();
+        Thread.currentThread().interrupt();
+      }
+      logWriter.stop();
+      logWriter.join();
+      // End JSON object (map) or array
+      printWriter.println(withKey ? " }" : " ]");
+    }
+
+    return !exception;
+  }
 
+  private void processRecords(ManagedRocksIterator iterator,
+                                 DBColumnFamilyDefinition dbColumnFamilyDef,
+                                 LogWriter logWriter,
+                                 ExecutorService threadPool,
+                                 boolean schemaV3) throws InterruptedException {
     if (startKey != null) {
       iterator.get().seek(getValueObject(dbColumnFamilyDef));
     }
-
-    if (withKey) {
-      // Start JSON object (map)
-      out.print("{ ");
-    } else {
-      // Start JSON array
-      out.print("[ ");
-    }
-
+    ArrayList<ByteArrayKeyValue> batch = new ArrayList<>(BATCH_SIZE);
+    // Used to ensure that the output of a multi-threaded parsed Json is in
+    // the same order as the RocksDB iterator.
+    long sequenceId = FIRST_SEQUENCE_ID;
     // Count number of keys printed so far
     long count = 0;
-    while (withinLimit(count) && iterator.get().isValid()) {
-      StringBuilder sb = new StringBuilder();
-      if (withKey) {
-        Object key = dbColumnFamilyDef.getKeyCodec()
-            .fromPersistedFormat(iterator.get().key());
-        Gson gson = new GsonBuilder().setPrettyPrinting().create();
-        if (schemaV3) {
-          int index =
-              DatanodeSchemaThreeDBDefinition.getContainerKeyPrefixLength();
-          String keyStr = key.toString();
-          if (index > keyStr.length()) {
-            err().println("Error: Invalid SchemaV3 table key length. "
-                + "Is this a V2 table? Try again with --dn-schema=V2");
-            return false;
-          }
-          String cid = keyStr.substring(0, index);
-          String blockId = keyStr.substring(index);
-          sb.append(gson.toJson(LongCodec.get().fromPersistedFormat(
-              FixedLengthStringCodec.string2Bytes(cid)) +
-              keySeparatorSchemaV3 +
-              blockId));
-        } else {
-          sb.append(gson.toJson(key));
-        }
-        sb.append(": ");
-      }
-
-      Gson gson = new GsonBuilder().setPrettyPrinting().create();
-      Object o = dbColumnFamilyDef.getValueCodec()
-          .fromPersistedFormat(iterator.get().value());
-      sb.append(gson.toJson(o));
-
+    List<Future<Void>> futures = new ArrayList<>();
+    while (withinLimit(count) && iterator.get().isValid() && !exception) {
+      batch.add(new ByteArrayKeyValue(
+          iterator.get().key(), iterator.get().value()));
       iterator.get().next();
-      ++count;
-      if (withinLimit(count) && iterator.get().isValid()) {
-        // If this is not the last entry, append comma
-        sb.append(", ");
+      count++;
+      if (batch.size() >= BATCH_SIZE) {
+        //
+        while (logWriter.getInflightLogCount() > THREAD_COUNT * 10

Review Comment:
   Added boundary to the queue of the threadpool. 
   - `LinkedBlockingQueue` in the threadpool is used to prevent too many tasks from accumulating in the threadpool. 
   - `getInflightLogCount()` is used to prevent `LogWriter` from having too many tasks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org